[ server ] [ executor ] - polish code -2

This commit is contained in:
IceDerce
2022-12-14 20:56:12 +08:00
parent 3934bf17f8
commit bf5dde57c2
5 changed files with 29 additions and 3 deletions

View File

@@ -29,9 +29,12 @@ public class RedisStreamReaderConfig {
@Bean @Bean
public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
streamKey = "manual-command"; streamKey = "streamKey_lbzb7";
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build(); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);

View File

@@ -13,6 +13,12 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> { public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> {
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde
//https://segmentfault.com/a/1190000040946712
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
@Resource @Resource
RedisStreamReaderConfig redisStreamReaderConfig; RedisStreamReaderConfig redisStreamReaderConfig;

View File

@@ -80,7 +80,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
return topicName + TimeString; return topicName + "-" + TimeString;
} }
} }

View File

@@ -1,6 +1,8 @@
package io.wdd.rpc.execute.web; package io.wdd.rpc.execute.web;
import io.wdd.common.beans.response.R; import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.config.RedisStreamReaderConfig;
import io.wdd.rpc.execute.result.CommandResultReader;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.CoreExecutionService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
@@ -19,6 +21,9 @@ public class ExecutionController {
@Resource @Resource
CoreExecutionService coreExecutionService; CoreExecutionService coreExecutionService;
@Resource
CommandResultReader commandResultReader;
@PostMapping("command") @PostMapping("command")
public R<String> patchCommandToAgent( public R<String> patchCommandToAgent(
@@ -38,6 +43,15 @@ public class ExecutionController {
return R.ok(streamKey); return R.ok(streamKey);
} }
@PostMapping("/stream")
public void GetCommandLog(
@RequestParam(value = "streamKey") String streamKey
){
commandResultReader.readFromStreamKey(streamKey);
}

View File

@@ -53,6 +53,9 @@ public class ToAgentMessageSender {
public void send(OctopusMessage octopusMessage) { public void send(OctopusMessage octopusMessage) {
log.info("OctopusMessage {} send to agent {}",octopusMessage, octopusMessage.getUuid());
} }