From bf5dde57c2e963ece1f6a8eedeccf57a59dff3da Mon Sep 17 00:00:00 2001 From: IceDerce Date: Wed, 14 Dec 2022 20:56:12 +0800 Subject: [PATCH] [ server ] [ executor ] - polish code -2 --- .../execute/config/RedisStreamReaderConfig.java | 7 +++++-- .../rpc/execute/result/CommandResultReader.java | 6 ++++++ .../execute/service/CoreExecutionServiceImpl.java | 2 +- .../wdd/rpc/execute/web/ExecutionController.java | 14 ++++++++++++++ .../rpc/message/sender/ToAgentMessageSender.java | 3 +++ 5 files changed, 29 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java index 9d550d8..2b2695e 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java @@ -29,9 +29,12 @@ public class RedisStreamReaderConfig { @Bean public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { - streamKey = "manual-command"; + streamKey = "streamKey_lbzb7"; - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build(); + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(2)) + .build(); StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index b043a26..3f11db4 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -13,6 +13,12 @@ import javax.annotation.Resource; @Slf4j public class CommandResultReader implements StreamListener> { + // https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde + + //https://segmentfault.com/a/1190000040946712 + + //https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers + @Resource RedisStreamReaderConfig redisStreamReaderConfig; diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index df49c6c..26e231d 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -80,7 +80,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - return topicName + TimeString; + return topicName + "-" + TimeString; } } diff --git a/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java index 1b8bf75..edc7be9 100644 --- a/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java @@ -1,6 +1,8 @@ package io.wdd.rpc.execute.web; import io.wdd.common.beans.response.R; +import io.wdd.rpc.execute.config.RedisStreamReaderConfig; +import io.wdd.rpc.execute.result.CommandResultReader; import io.wdd.rpc.execute.service.CoreExecutionService; import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.PostMapping; @@ -19,6 +21,9 @@ public class ExecutionController { @Resource CoreExecutionService coreExecutionService; + @Resource + CommandResultReader commandResultReader; + @PostMapping("command") public R patchCommandToAgent( @@ -38,6 +43,15 @@ public class ExecutionController { return R.ok(streamKey); } + @PostMapping("/stream") + public void GetCommandLog( + @RequestParam(value = "streamKey") String streamKey + ){ + + commandResultReader.readFromStreamKey(streamKey); + + } + diff --git a/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java index 7127988..ddf5e8f 100644 --- a/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java +++ b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java @@ -53,6 +53,9 @@ public class ToAgentMessageSender { public void send(OctopusMessage octopusMessage) { + log.info("OctopusMessage {} send to agent {}",octopusMessage, octopusMessage.getUuid()); + + }