diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index 27d59fa..da2d4f0 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -1,6 +1,8 @@ package io.wdd.agent.executor.redis; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.agent.config.beans.executor.CommandLog; import io.wdd.agent.config.beans.executor.StreamSenderEntity; import io.wdd.agent.executor.thread.LogToArrayListCache; @@ -37,6 +39,9 @@ public class StreamSender { @Resource RedisTemplate redisTemplate; + @Resource + ObjectMapper objectMapper; + @Resource LogToArrayListCache logToArrayListCache; @@ -134,7 +139,16 @@ public class StreamSender { private boolean send(String streamKey, List content) { - return this.send(streamKey, content.toString()); + + try { + + String resultContent = objectMapper.writeValueAsString(content); + return this.send(streamKey, resultContent); + + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java b/server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java new file mode 100644 index 0000000..985b36a --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java @@ -0,0 +1,29 @@ +package io.wdd.rpc.execute.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializer; + +@Configuration +public class RedisConfiguration { + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + + RedisTemplate redisTemplate = new RedisTemplate<>(); + + redisTemplate.setConnectionFactory(redisConnectionFactory); + + GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); + + redisTemplate.setKeySerializer(RedisSerializer.string()); + redisTemplate.setHashKeySerializer(RedisSerializer.string()); + redisTemplate.setValueSerializer(jsonRedisSerializer); + redisTemplate.setHashValueSerializer(jsonRedisSerializer); + + return redisTemplate; + } +} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index 11c8014..fc0e696 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -1,10 +1,17 @@ package io.wdd.rpc.execute.result; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; @Getter @Setter @@ -30,6 +37,7 @@ public class CommandResultReader implements StreamListener message) { - String commandLog = message.getValue().values().iterator().next(); + String streamKey = message.getStream(); + + RecordId messageId = message.getId(); + + String key = (String) message.getValue().keySet().toArray()[0]; + + String value = (String) message.getValue().values().toArray()[0]; + + + ObjectMapper objectMapper = new ObjectMapper(); + try { + + System.out.println("streamKey = " + streamKey); + System.out.println("messageId = " + messageId); + System.out.println("key = " + key); + System.out.println("value = " + value); + + ArrayListcommandResultList = objectMapper.readValue(value, ArrayList.class); + commandResultList.stream().forEach( + System.out::println + ); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } - System.out.println("commandLog = " + commandLog); log.info("intend to be handled already !"); diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java index 8789393..4532196 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java @@ -2,12 +2,16 @@ package io.wdd.rpc.execute.result; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.stereotype.Component; import java.lang.reflect.Field; +import java.util.HashMap; + +import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONTAINER; @Component @@ -16,48 +20,100 @@ public class CreateStreamReader implements ApplicationContextAware { private ApplicationContext applicationContext; + private AutowireCapableBeanFactory beanFactory; + + private RedisStreamReaderConfig redisStreamReaderConfig; + + + private HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); + + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } - public void registerStreamReader(String streamKey) { + // prepare the environment + prepareEnv(); + // destroy the REDIS_STREAM_LISTENER_CONTAINER + destroyStreamReader(streamKey); - Field declaredField = null; - try { + // modify the configuration ==> streamKey + modifyStreamReader(streamKey); - log.debug("start to create the redis stream listener container"); - // create the lazy bean - RedisStreamReaderConfig.MyStreamMessageListenerContainer redisStreamListenerContainer = applicationContext.getBean("redisStreamListenerContainer", RedisStreamReaderConfig.MyStreamMessageListenerContainer.class); - - declaredField = redisStreamListenerContainer.getClass().getDeclaredField("streamKey"); - - log.debug("Change Redis Stream Reader from [ {} ] to [ {} ]",declaredField.get(redisStreamListenerContainer), streamKey); - - - log.debug("start to set the Redis Stream Reader key"); - declaredField.set(redisStreamListenerContainer, streamKey); - - - log.debug("current stream key is [ {} ]",declaredField.get(redisStreamListenerContainer)); - - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } + // re-create the REDIS_STREAM_LISTENER_CONTAINER + createStreamReader(streamKey); } + private void prepareEnv(){ + + getBeanFactory(); + + getRedisStreamConfig(); + + } + + private void getRedisStreamConfig() { + this.redisStreamReaderConfig = applicationContext.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class); + } + + + private void getBeanFactory(){ + this.beanFactory = applicationContext.getAutowireCapableBeanFactory(); + } + private void createStreamReader(String streamKey) { + log.debug("start to create the redis stream listener container"); + // create the lazy bean + + StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(REDIS_STREAM_LISTENER_CONTAINER, StreamMessageListenerContainer.class); + + REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer); + + // very important + log.debug("start the listener container"); + streamMessageListenerContainer.start(); + + } + + private void modifyStreamReader(String streamKey) { + + log.debug("start to modify the redis stream listener container stream key"); + String oldStreamKey = redisStreamReaderConfig.getStreamKey(); + + log.debug("change stream key from [{}] to [{}]", oldStreamKey, streamKey); + + log.debug("start to set the Redis Stream Reader key"); + redisStreamReaderConfig.setStreamKey(streamKey); + } private void destroyStreamReader(String streamKey) { + log.debug("start to destroy {}", REDIS_STREAM_LISTENER_CONTAINER); + + String oldStreamKey = redisStreamReaderConfig.getStreamKey(); + + if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { + + StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); + + log.debug("destroyed old redis stream listener container is [ {} ]", streamMessageListenerContainer); + + + // double destroy + beanFactory.destroyBean(streamMessageListenerContainer); + streamMessageListenerContainer.stop(); + // help gc + streamMessageListenerContainer = null; + } + + } } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java index dbb7b09..81c2f38 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; +import org.springframework.context.annotation.Scope; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; @@ -16,61 +17,49 @@ import java.time.Duration; @Configuration @Slf4j -@Lazy public class RedisStreamReaderConfig { @Resource private RedisConnectionFactory redisConnectionFactory; + public static final String REDIS_STREAM_LISTENER_CONTAINER = "redisStreamListenerContainer"; - @Bean(initMethod = "start", destroyMethod = "stop") -// @Scope(value = "prototype") + private String streamKey = "cccc"; + + public void setStreamKey(String streamKey) { + this.streamKey = streamKey; + } + + public String getStreamKey() { + return streamKey; + } + + @Bean(value = REDIS_STREAM_LISTENER_CONTAINER) + @Scope("prototype") @Lazy - public MyStreamMessageListenerContainer redisStreamListenerContainer() { + public StreamMessageListenerContainer> redisStreamListenerContainer(){ - return new MyStreamMessageListenerContainer(); + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(2)) + .build(); + + StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); + + listenerContainer.receive( + + StreamOffset.create(streamKey, ReadOffset.lastConsumed()), + + new CommandResultReader( + "OctopusServer", + streamKey, + "OctopusServer") + + ); + + return listenerContainer; } - class MyStreamMessageListenerContainer { - public String streamKey = "cccc"; - - public void start() { - log.debug("Redis Stream Reader stream key is [ {} ]", this.streamKey); - - } - - public void stop() { - log.debug("Redis Stream Reader destroyed ! stream key is [ {} ]", this.streamKey); - } - - public String get() { - return this.streamKey; - } - - - public StreamMessageListenerContainer> MyStreamMessageListenerContainer() { - - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - .pollTimeout(Duration.ofSeconds(2)) - .build(); - - StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); - - listenerContainer.receive( - StreamOffset.create(streamKey, ReadOffset.lastConsumed()), - - new CommandResultReader( - "Octopus-Server", - "Octopus-Group", - "OctopusServer") - - ); - - return listenerContainer; - } - - } } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 80b67c3..5810faf 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -6,6 +6,8 @@ import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.rpc.message.sender.ToAgentMessageSender; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -15,6 +17,7 @@ import java.util.List; import java.util.stream.Collectors; @Service +@Slf4j public class CoreExecutionServiceImpl implements CoreExecutionService { @Resource @@ -23,6 +26,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Resource ObjectMapper objectMapper; + @Resource + RedisTemplate redisTemplate; + @Override public String SendCommandToAgent(String topicName, String command) { return this.SendCommandToAgent(topicName, List.of(command)); @@ -51,9 +57,17 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { throw new RuntimeException(e); } + String resultKey = executionMessage.getResultKey(); + // set up the stream read group + String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey); + System.out.println("group = " + group); + log.debug("set consumer group for the stream key with => [ {} ]", resultKey); + + + // send the message messageSender.send(octopusMessage); - return executionMessage.getResultKey(); + return resultKey; } diff --git a/server/src/main/resources/application-local.yml b/server/src/main/resources/application-local.yml index ef43407..77c7df1 100644 --- a/server/src/main/resources/application-local.yml +++ b/server/src/main/resources/application-local.yml @@ -17,4 +17,10 @@ spring: config-long-poll-timeout: 5000 extension-configs: - group: local - data-id: common-local.yaml \ No newline at end of file + data-id: common-local.yaml + +debug: true +logging: + level: + io.wdd.server: + debug \ No newline at end of file diff --git a/server/src/main/resources/bootstrap.yml b/server/src/main/resources/bootstrap.yml index ef43407..77c7df1 100644 --- a/server/src/main/resources/bootstrap.yml +++ b/server/src/main/resources/bootstrap.yml @@ -17,4 +17,10 @@ spring: config-long-poll-timeout: 5000 extension-configs: - group: local - data-id: common-local.yaml \ No newline at end of file + data-id: common-local.yaml + +debug: true +logging: + level: + io.wdd.server: + debug \ No newline at end of file