diff --git a/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java deleted file mode 100644 index 2b2695e..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.wdd.rpc.execute.config; - - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.Subscription; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.stream.StreamListener; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; - -import javax.annotation.Resource; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.time.Duration; - -@Configuration -public class RedisStreamReaderConfig { - - public String streamKey; - - @Resource - private StreamListener> streamListener; - - - @Bean - public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { - - streamKey = "streamKey_lbzb7"; - - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - .pollTimeout(Duration.ofSeconds(2)) - .build(); - - StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); - - - org.springframework.data.redis.stream.Subscription subscription = listenerContainer.receive( - - Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), - - StreamOffset.create(streamKey, ReadOffset.lastConsumed()), - - streamListener); - - listenerContainer.start(); - - return subscription; - } - -} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index 3f11db4..11c8014 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -1,15 +1,13 @@ package io.wdd.rpc.execute.result; -import io.wdd.rpc.execute.config.RedisStreamReaderConfig; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.stream.StreamListener; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; - -@Service +@Getter +@Setter @Slf4j public class CommandResultReader implements StreamListener> { @@ -19,9 +17,24 @@ public class CommandResultReader implements StreamListener message) { @@ -35,12 +48,4 @@ public class CommandResultReader implements StreamListener {}",formerKey, streamKey); - - redisStreamReaderConfig.streamKey = streamKey; - } - } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java new file mode 100644 index 0000000..8789393 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java @@ -0,0 +1,63 @@ +package io.wdd.rpc.execute.result; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Field; + + +@Component +@Slf4j +public class CreateStreamReader implements ApplicationContextAware { + + private ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + + public void registerStreamReader(String streamKey) { + + + + Field declaredField = null; + try { + + log.debug("start to create the redis stream listener container"); + // create the lazy bean + RedisStreamReaderConfig.MyStreamMessageListenerContainer redisStreamListenerContainer = applicationContext.getBean("redisStreamListenerContainer", RedisStreamReaderConfig.MyStreamMessageListenerContainer.class); + + declaredField = redisStreamListenerContainer.getClass().getDeclaredField("streamKey"); + + log.debug("Change Redis Stream Reader from [ {} ] to [ {} ]",declaredField.get(redisStreamListenerContainer), streamKey); + + + log.debug("start to set the Redis Stream Reader key"); + declaredField.set(redisStreamListenerContainer, streamKey); + + + log.debug("current stream key is [ {} ]",declaredField.get(redisStreamListenerContainer)); + + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + } + + private void createStreamReader(String streamKey) { + + } + + + private void destroyStreamReader(String streamKey) { + + } +} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java new file mode 100644 index 0000000..dbb7b09 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java @@ -0,0 +1,76 @@ +package io.wdd.rpc.execute.result; + + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; + +import javax.annotation.Resource; +import java.time.Duration; + +@Configuration +@Slf4j +@Lazy +public class RedisStreamReaderConfig { + + @Resource + private RedisConnectionFactory redisConnectionFactory; + + + @Bean(initMethod = "start", destroyMethod = "stop") +// @Scope(value = "prototype") + @Lazy + public MyStreamMessageListenerContainer redisStreamListenerContainer() { + + return new MyStreamMessageListenerContainer(); + } + + class MyStreamMessageListenerContainer { + + public String streamKey = "cccc"; + + public void start() { + log.debug("Redis Stream Reader stream key is [ {} ]", this.streamKey); + + } + + public void stop() { + log.debug("Redis Stream Reader destroyed ! stream key is [ {} ]", this.streamKey); + } + + public String get() { + return this.streamKey; + } + + + public StreamMessageListenerContainer> MyStreamMessageListenerContainer() { + + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .pollTimeout(Duration.ofSeconds(2)) + .build(); + + StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); + + listenerContainer.receive( + StreamOffset.create(streamKey, ReadOffset.lastConsumed()), + + new CommandResultReader( + "Octopus-Server", + "Octopus-Group", + "OctopusServer") + + ); + + return listenerContainer; + } + + } + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java index edc7be9..db8b67b 100644 --- a/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java @@ -1,8 +1,7 @@ package io.wdd.rpc.execute.web; import io.wdd.common.beans.response.R; -import io.wdd.rpc.execute.config.RedisStreamReaderConfig; -import io.wdd.rpc.execute.result.CommandResultReader; +import io.wdd.rpc.execute.result.CreateStreamReader; import io.wdd.rpc.execute.service.CoreExecutionService; import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.PostMapping; @@ -22,7 +21,7 @@ public class ExecutionController { CoreExecutionService coreExecutionService; @Resource - CommandResultReader commandResultReader; + CreateStreamReader createStreamReader; @PostMapping("command") @@ -30,14 +29,14 @@ public class ExecutionController { @RequestParam(value = "topicName") String topicName, @RequestParam(value = "commandList", required = false) @Nullable List commandList, @RequestParam(value = "type", required = false) @Nullable String type - ) { + ) { String streamKey = ""; if (StringUtils.isEmpty(type)) { streamKey = coreExecutionService.SendCommandToAgent(topicName, commandList); } else { - streamKey = coreExecutionService.SendCommandToAgent(topicName, type,commandList); + streamKey = coreExecutionService.SendCommandToAgent(topicName, type, commandList); } return R.ok(streamKey); @@ -46,13 +45,11 @@ public class ExecutionController { @PostMapping("/stream") public void GetCommandLog( @RequestParam(value = "streamKey") String streamKey - ){ + ) { - commandResultReader.readFromStreamKey(streamKey); + createStreamReader.registerStreamReader(streamKey); } - - } diff --git a/server/src/main/resources/bootstrap.yml b/server/src/main/resources/bootstrap.yml index 605cba7..ef43407 100644 --- a/server/src/main/resources/bootstrap.yml +++ b/server/src/main/resources/bootstrap.yml @@ -2,11 +2,11 @@ spring: application: name: octopus-server profiles: - active: k3s + active: local cloud: nacos: config: - group: k3s + group: local config-retry-time: 3000 file-extension: yaml max-retry: 3 @@ -16,5 +16,5 @@ spring: timeout: 5000 config-long-poll-timeout: 5000 extension-configs: - - group: k3s - data-id: common-k3s.yaml \ No newline at end of file + - group: local + data-id: common-local.yaml \ No newline at end of file