diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java index 3b3a277..2b949df 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java @@ -22,7 +22,7 @@ public class ExecutionMessage { public static String GetResultKey(String topicName) { - return topicName + "Execution:" + TimeUtils.currentTimeStringFullSplit(); + return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit(); } } diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 14a722e..bba4b7d 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -1,7 +1,7 @@ package io.wdd.rpc.controller; import io.wdd.common.beans.response.R; -import io.wdd.rpc.execute.result.CreateStreamReader; +import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.service.CoreExecutionService; import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.PostMapping; @@ -24,7 +24,7 @@ public class ExecutionController { CoreExecutionService coreExecutionService; @Resource - CreateStreamReader createStreamReader; + BuildStreamReader buildStreamReader; @PostMapping("command") @@ -50,7 +50,7 @@ public class ExecutionController { @RequestParam(value = "streamKey") String streamKey ) { - createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER ,streamKey); + buildStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER , streamKey); } @@ -59,7 +59,7 @@ public class ExecutionController { @RequestParam(value = "streamKey") String streamKey ) { - createStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER ,streamKey); + buildStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER , streamKey); } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java index ba2abc6..5cce874 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java @@ -26,9 +26,11 @@ public class CommandReaderConfig { */ private String consumerName; + private String streamKey; + /** * 执行的结果对象,保存在此处 */ - private String ExecutionResult; + private ArrayList ExecutionResult; } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java new file mode 100644 index 0000000..7b36c6a --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java @@ -0,0 +1,24 @@ +package io.wdd.rpc.execute.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONSUMER_NAME; + +@Configuration +public class CommandReaderConfigBean { + + @Bean + public CommandReaderConfig commandReaderConfig() { + + return CommandReaderConfig + .builder() + .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME) + .streamKey("ccc") + .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME) + .ExecutionResult(null) + .build(); + } + + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java index 1278435..2a88515 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java @@ -1,6 +1,5 @@ package io.wdd.rpc.execute.config; -import io.swagger.annotations.Api; import io.swagger.annotations.ApiModel; import lombok.AllArgsConstructor; import lombok.Data; @@ -13,4 +12,7 @@ import lombok.experimental.SuperBuilder; @SuperBuilder(toBuilder = true) @ApiModel("Execution模快持久化Bean对象") public class ExecutionLogBean { + + private String name; + } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java similarity index 62% rename from server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java rename to server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java index 42bebb6..17ff4cf 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java @@ -1,27 +1,90 @@ package io.wdd.rpc.execute.result; +import io.wdd.rpc.execute.config.CommandReaderConfig; import io.wdd.server.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.HashMap; +import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER; + @Component @Slf4j -public class CreateStreamReader { +public class BuildStreamReader { private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); private RedisStreamReaderConfig redisStreamReaderConfig; + private StreamMessageListenerContainer streamMessageListenerContainer; + + private CommandReaderConfig commandReaderConfig; + + public void buildStreamReader(CommandReaderConfig commandReaderConfig) { + + // prepare the environment + prepareExecutionEnv(); + + + // just modify the redis listener container and it's ok + modifyExecutionStreamReader(commandReaderConfig); + + } + + private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) { + + // stop the old stream listener container + this.streamMessageListenerContainer.stop(); + + // modify container + this.streamMessageListenerContainer.receive( + StreamOffset.create( + commandReaderConfig.getStreamKey(), + ReadOffset.lastConsumed()), + + new CommandResultReader( + commandReaderConfig + ) + ); + + // very important + this.streamMessageListenerContainer.start(); + } + + private void prepareExecutionEnv() { + + getRedisStreamListenerContainer(); + + getRedisStreamReaderConfig(); + + } + + private void getRedisStreamReaderConfig() { + + this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig", + CommandReaderConfig.class); + } + + private void getRedisStreamListenerContainer() { + + this.streamMessageListenerContainer = SpringUtils.getBean( + EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER, + StreamMessageListenerContainer.class + ); + } + public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { registerStreamReader(redisStreamListenerContainerBeanName, streamKey, null); } - public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, String ExecutionResult) { + public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList ExecutionResult) { // prepare the environment prepareEnv(); @@ -37,7 +100,8 @@ public class CreateStreamReader { destroyStreamReader(streamKey); // modify the configuration ==> streamKey - modifyStreamReader(streamKey, ExecutionResult); + modifyStreamReader(streamKey, + ExecutionResult); // re-create the REDIS_STREAM_LISTENER_CONTAINER createStreamReader(redisStreamListenerContainerBeanName, @@ -76,7 +140,7 @@ public class CreateStreamReader { } - private void modifyStreamReader(String streamKey, String executionResult) { + private void modifyStreamReader(String streamKey, ArrayList executionResult) { log.debug("start to modify the redis stream listener container stream key"); String oldStreamKey = redisStreamReaderConfig.getStreamKey(); diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index 013968a..e6a9569 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -5,7 +5,6 @@ import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; @@ -33,14 +32,14 @@ public class CommandResultReader implements StreamListener executionResult) { + this.commandReaderConfig = CommandReaderConfig + .builder() + .consumerName(consumerName) + .group(group) + .consumerType(consumerType) + .ExecutionResult(executionResult) + .build(); } @@ -62,31 +61,24 @@ public class CommandResultReader implements StreamListener executionResultFormat = ExecutionResultStringDeserializer.format(value); + // 赋值给外部的结果,是的执行的结果可以被拿到 - String executionResult = this.commandReaderConfig.getExecutionResult(); - if (StringUtils.isNotEmpty(executionResult)) { - executionResult = value; - } + this.commandReaderConfig.setExecutionResult(executionResultFormat); log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId); - // print to console - printPrettyDeserializedCommandResult(value); + executionResultFormat + .stream() + .forEach( + System.out::println + ); - } - - private void printPrettyDeserializedCommandResult(String valueString) { - - ArrayList executionResult = ExecutionResultStringDeserializer.format(valueString); - - executionResult.stream() - .forEach( - System.out::println - ); } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java index 786c5b9..e65c906 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java @@ -1,6 +1,7 @@ package io.wdd.rpc.execute.result; +import io.wdd.rpc.execute.config.CommandReaderConfig; import io.wdd.rpc.status.AgentStatusStreamReader; import lombok.Getter; import lombok.Setter; @@ -17,6 +18,7 @@ import org.springframework.data.redis.stream.StreamMessageListenerContainer; import javax.annotation.Resource; import java.time.Duration; +import java.util.ArrayList; @Configuration @Slf4j @@ -28,13 +30,39 @@ public class RedisStreamReaderConfig { private RedisConnectionFactory redisConnectionFactory; public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer"; + + public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer"; public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer"; + + public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer"; + /** + * used in old model + */ private String streamKey = "cccc"; - private String executionResult = null; + /** + * no use + */ + private ArrayList executionResult = null; + + @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER) + @Lazy + public StreamMessageListenerContainer> executionResultRedisStreamListenerContainer(){ + + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(2)) + .build(); + + StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); + + return listenerContainer; + } + + @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) @Scope("prototype") @Lazy @@ -48,15 +76,14 @@ public class RedisStreamReaderConfig { StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); // todo 此部分可以被移出到另外的位置,会更加方便,就不需要对此Bean进行创建和销毁了 - listenerContainer.receive( StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new CommandResultReader( - "OctopusServer", + REDIS_STREAM_LISTENER_CONSUMER_NAME, streamKey, - "OctopusServer", + REDIS_STREAM_LISTENER_CONSUMER_NAME, executionResult ) @@ -82,9 +109,9 @@ public class RedisStreamReaderConfig { StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AgentStatusStreamReader( - "OctopusServer", - "OctopusServer", - "OctopusServer") + REDIS_STREAM_LISTENER_CONSUMER_NAME, + REDIS_STREAM_LISTENER_CONSUMER_NAME, + REDIS_STREAM_LISTENER_CONSUMER_NAME) ); diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 3d32543..1147c65 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -6,7 +6,7 @@ import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.rpc.execute.config.ExecutionLogBean; -import io.wdd.rpc.execute.result.CreateStreamReader; +import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.message.sender.ToAgentMessageSender; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; @@ -14,11 +14,9 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.List; import java.util.stream.Collectors; -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; @Service @@ -35,7 +33,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { RedisTemplate redisTemplate; @Resource - CreateStreamReader createStreamReader; + BuildStreamReader buildStreamReader; @Override public String SendCommandToAgent(String topicName, String command) { @@ -70,7 +68,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // set up the stream read group String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey); - log.debug("set consumer group for the stream key with => [ {} ]", resultKey); + log.info("set consumer group [{}] for the stream key with => [ {} ]", group, resultKey); // change the redis stream listener container // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); @@ -85,7 +83,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) { - return null; + return new ExecutionLogBean(); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java index 6bdc0b7..59f4c43 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java @@ -1,19 +1,19 @@ package io.wdd.rpc.execute.service; +import io.wdd.rpc.execute.config.CommandReaderConfig; import io.wdd.rpc.execute.config.ExecutionLogBean; -import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer; -import io.wdd.rpc.execute.result.CreateStreamReader; +import io.wdd.rpc.execute.result.BuildStreamReader; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; +import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; - /** * 1. [waiting strategy ] * 2. [build the redis stream listener] @@ -29,105 +29,140 @@ public class ExecutionResultDaemonHandler { * which means there are execution running , waiting for their result to handle */ public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); + private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70; @Resource - CreateStreamReader createStreamReader; + BuildStreamReader buildStreamReader; + @Resource + CommandReaderConfig commandReaderConfig; @PostConstruct public void startExecutionDaemonHandler() { + // 启动一个异步线程,运行 Execution结果处理守护进程 + CompletableFuture.runAsync( + () -> realStartExecutionDaemonHandler() + ); + + } + + private void realStartExecutionDaemonHandler() { + while (true) { - if (WAIT_EXECUTION_RESULT_LIST.size() == 0) { + while (WAIT_EXECUTION_RESULT_LIST.size() == 0) { try { // no execution result need to handle + // wait for 5 seconds + log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !"); TimeUnit.SECONDS.sleep(5); - // restart the forever loop - continue; } catch (InterruptedException e) { throw new RuntimeException(e); } } // has result to handle , just handle one result at one time - String resultKey = WAIT_EXECUTION_RESULT_LIST.keys() - .nextElement(); + String resultKey = WAIT_EXECUTION_RESULT_LIST + .keys() + .nextElement(); + + log.info("current result key is [{}]", + resultKey); - CompletableFuture executionResultFuture = CompletableFuture.supplyAsync( - () -> { - String executionResult = ""; + CompletableFuture> executionResultFuture = + CompletableFuture + .supplyAsync( + () -> { + // 修改相应的参数 + commandReaderConfig.setStreamKey(resultKey); + // listener container 实际上是根据这个绑定的 + commandReaderConfig.setGroup(resultKey); + // 必须归零 + commandReaderConfig.setExecutionResult(null); - // 构造 resultKey对应的 Redis Stream Listener Container - createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, - resultKey, - executionResult); - // 获得结果 - String s = "no no no"; - try { - s = CompletableFuture.supplyAsync( - () -> { - while (true) { - if (StringUtils.isNotEmpty(executionResult)) { - return executionResult; - } + // 构造 resultKey对应的 Redis Stream Listener Container + buildStreamReader + .buildStreamReader(commandReaderConfig); + // 获得结果 + ArrayList s = new ArrayList<>( + List.of("no no no") + ); - try { - TimeUnit.SECONDS.sleep(3); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + s = CompletableFuture + .supplyAsync( + () -> { + while (true) { + + if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) { + return commandReaderConfig.getExecutionResult(); + } + + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } - ) - .get(80, - TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new RuntimeException(e); - } + } + ) + .get(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, + TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } - return s; - } - ); - CompletableFuture falloutTimeFuture = CompletableFuture.supplyAsync( + return s; + } + ); + + CompletableFuture> falloutTimeFuture = CompletableFuture.supplyAsync( () -> { try { - TimeUnit.SECONDS.sleep(70); + TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT); } catch (InterruptedException e) { throw new RuntimeException(e); } - return "[ Failed ] - execution has failed !"; + return new ArrayList<>( + List.of("[ Failed ] - execution has failed !") + ); } ); // 获取结果,然后销毁Stream Listener Container - CompletableFuture.anyOf(falloutTimeFuture, - executionResultFuture) - .whenCompleteAsync( - (resultString, e) -> { + CompletableFuture complete = CompletableFuture + .anyOf(falloutTimeFuture, + executionResultFuture); - log.info("execution result are => {}", - resultString); + complete.whenComplete( + (resultString, e) -> { - // 持久化存储对应的结果 - ExecutionResultStringDeserializer.format(String.valueOf(resultString)); + log.info("execution result are => {}", + resultString); + + // 持久化存储对应的结果 - } - ); + // 清除此次任务的内容 + WAIT_EXECUTION_RESULT_LIST.remove(resultKey); + + log.info("[Execution] - whole process are complete !"); + } + ); + } - } diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index b492a8f..a1aa7b2 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.beans.status.AgentStatus; import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.message.sender.ToAgentMessageSender; import io.wdd.server.beans.vo.ServerInfoVO; @@ -115,7 +116,7 @@ public class AcceptAgentInitInfo { } // 4. generate the Octopus Agent Status Redis Stream Key & Consumer-Group - generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getServerName()); + generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getTopicName()); // 5. send InitMessage to agent sendInitMessageToAgent(serverInfoVO); @@ -162,23 +163,30 @@ public class AcceptAgentInitInfo { channel.basicAck(deliveryTag, false); } - private void generateAgentStatusRedisStreamConsumerGroup(String serverName) { + private void generateAgentStatusRedisStreamConsumerGroup(String agentTopicName) { - String statusStreamKey = serverName + "-status"; + String statusStreamKey = AgentStatus.getRedisStatusKey(agentTopicName); + + if (!redisTemplate.hasKey(statusStreamKey)) { + log.debug(" not find the group, recreate"); + + // not find the group, recreate + redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer"); + } // check for octopus-server consumer group - if (redisTemplate.opsForStream().groups(statusStreamKey) + /*if (redisTemplate.opsForStream().groups(statusStreamKey) .stream() .filter( group -> group.groupName().startsWith("Octopus") ).collect(Collectors.toSet()).contains(Boolean.FALSE)) { - log.debug(" not find the group, recreate"); - // not find the group, recreate - redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer"); - } - log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", serverName, statusStreamKey); + + redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer"); + }*/ + + log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", agentTopicName, statusStreamKey); } private boolean checkAgentAlreadyRegister(String agentQueueTopic) {