diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 7d29abf..b768099 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -4,7 +4,6 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.wdd.common.response.R; -import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.execute.service.SyncExecutionService; import org.springframework.web.bind.annotation.PostMapping; @@ -18,7 +17,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @@ -30,8 +28,6 @@ public class ExecutionController { @Resource SyncExecutionService syncExecutionService; @Resource - BuildStreamReader buildStreamReader; - @Resource AsyncExecutionService asyncExecutionService; @PostMapping("/command/one") @@ -198,11 +194,6 @@ public class ExecutionController { @RequestParam(value = "streamKey") @ApiParam(value = "status的Stream Key") String streamKey ) { - buildStreamReader.registerStreamReader( - AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER, - streamKey - ); - return R.ok("请到控制台查看,已经切换至 => " + streamKey); } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java index f4da32e..ac3ae3f 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java @@ -1,27 +1,26 @@ -package io.wdd.rpc.execute.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONSUMER_NAME; - -@Configuration -public class CommandReaderConfigBean { - - // todo must support for multi thread - // its not thread safe now - @Bean - public CommandReaderConfig commandReaderConfig() { - - return CommandReaderConfig - .builder() - .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME) - .streamKey("ccc") - .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME) - .group("ccc") - .ExecutionResult(null) - .build(); - } - - -} +//package io.wdd.rpc.execute.config; +// +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +// +//@Configuration +//public class CommandReaderConfigBean { +// +// // todo must support for multi thread +// // its not thread safe now +// @Bean +// public CommandReaderConfig commandReaderConfig() { +// +// return CommandReaderConfig +// .builder() +// .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME) +// .streamKey("ccc") +// .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME) +// .group("ccc") +// .ExecutionResult(null) +// .build(); +// } +// +// +//} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java index 5775012..c6168a7 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java @@ -1,189 +1,189 @@ -package io.wdd.rpc.execute.result; - -import io.wdd.rpc.execute.config.CommandReaderConfig; -import io.wdd.server.utils.SpringUtils; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER; - - -@Component -@Slf4j -public class BuildStreamReader { - - private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); - private RedisStreamReaderConfig redisStreamReaderConfig; - - private StreamMessageListenerContainer streamMessageListenerContainer; - - private CommandReaderConfig commandReaderConfig; - - public void buildStreamReader(CommandReaderConfig commandReaderConfig) { - - // prepare the environment - prepareExecutionEnv(); - - - // just modify the redis listener container and it's ok - modifyExecutionStreamReader(commandReaderConfig); - - } - - @SneakyThrows - private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) { - - // stop the old stream listener container - if (this.streamMessageListenerContainer.isRunning()) { - this.streamMessageListenerContainer.stop(); - } - - // modify container - this.streamMessageListenerContainer.receive( - StreamOffset.create( - commandReaderConfig.getStreamKey(), - ReadOffset.lastConsumed()), - - new CommandResultReader( - commandReaderConfig - ) - ); - - - // very important - TimeUnit.MILLISECONDS.sleep(500); - this.streamMessageListenerContainer.start(); - } - - private void prepareExecutionEnv() { - - getRedisStreamListenerContainer(); - - getRedisStreamReaderConfig(); - - } - - private void getRedisStreamReaderConfig() { - - this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig", - CommandReaderConfig.class); - } - - private void getRedisStreamListenerContainer() { - - this.streamMessageListenerContainer = SpringUtils.getBean( - EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER, - StreamMessageListenerContainer.class - ); - } - - public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { - registerStreamReader(redisStreamListenerContainerBeanName, - streamKey, - null); - } - - public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList ExecutionResult) { - - // prepare the environment - prepareEnv(); - - // oldStreamKey equals streamKey don't need to do anything , just return - if (redisStreamReaderConfig.getStreamKey() - .equals(streamKey)) { - log.debug("redis listener container not change !"); - return; - } - - // destroy the old REDIS_STREAM_LISTENER_CONTAINER - destroyStreamReader(streamKey); - - // modify the configuration ==> streamKey - modifyStreamReader(streamKey, - ExecutionResult); - - // re-create the REDIS_STREAM_LISTENER_CONTAINER - createStreamReader(redisStreamListenerContainerBeanName, - streamKey); - - } - - private void prepareEnv() { - - getRedisStreamConfig(); - - } - - private void getRedisStreamConfig() { - - this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", - RedisStreamReaderConfig.class); - } - - - private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { - - log.debug("start to create the redis stream listener container"); - // create the lazy bean - - StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, - StreamMessageListenerContainer.class); - - REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, - streamMessageListenerContainer); - - // very important - log.debug("start the listener container"); - streamMessageListenerContainer.start(); - - - } - - private void modifyStreamReader(String streamKey, ArrayList executionResult) { - - log.debug("start to modify the redis stream listener container stream key"); - String oldStreamKey = redisStreamReaderConfig.getStreamKey(); - - log.debug("change stream key from [{}] to [{}]", - oldStreamKey, - streamKey); - - log.debug("start to set the Redis Stream Reader key"); - redisStreamReaderConfig.setStreamKey(streamKey); - - log.debug("start to set the Redis Stream Execution Result Container"); - redisStreamReaderConfig.setExecutionResult(executionResult); - - } - - - private void destroyStreamReader(String streamKey) { - - String oldStreamKey = redisStreamReaderConfig.getStreamKey(); - - if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { - - StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); - - log.debug("destroyed old redis stream listener container is [ {} ]", - streamMessageListenerContainer); - - - // double destroy - SpringUtils.destroyBean(streamMessageListenerContainer); - streamMessageListenerContainer.stop(); - // help gc - streamMessageListenerContainer = null; - } - - - } -} +//package io.wdd.rpc.execute.result; +// +//import io.wdd.rpc.execute.config.CommandReaderConfig; +//import io.wdd.server.utils.SpringUtils; +//import lombok.SneakyThrows; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.data.redis.connection.stream.ReadOffset; +//import org.springframework.data.redis.connection.stream.StreamOffset; +//import org.springframework.data.redis.stream.StreamMessageListenerContainer; +//import org.springframework.stereotype.Component; +// +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.concurrent.TimeUnit; +// +//import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER; +// +// +//@Component +//@Slf4j +//public class BuildStreamReader { +// +// private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); +// private RedisStreamReaderConfig redisStreamReaderConfig; +// +// private StreamMessageListenerContainer streamMessageListenerContainer; +// +// private CommandReaderConfig commandReaderConfig; +// +// public void buildStreamReader(CommandReaderConfig commandReaderConfig) { +// +// // prepare the environment +// prepareExecutionEnv(); +// +// +// // just modify the redis listener container and it's ok +// modifyExecutionStreamReader(commandReaderConfig); +// +// } +// +// @SneakyThrows +// private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) { +// +// // stop the old stream listener container +// if (this.streamMessageListenerContainer.isRunning()) { +// this.streamMessageListenerContainer.stop(); +// } +// +// // modify container +// this.streamMessageListenerContainer.receive( +// StreamOffset.create( +// commandReaderConfig.getStreamKey(), +// ReadOffset.lastConsumed()), +// +// new CommandResultReader( +// commandReaderConfig +// ) +// ); +// +// +// // very important +// TimeUnit.MILLISECONDS.sleep(500); +// this.streamMessageListenerContainer.start(); +// } +// +// private void prepareExecutionEnv() { +// +// getRedisStreamListenerContainer(); +// +// getRedisStreamReaderConfig(); +// +// } +// +// private void getRedisStreamReaderConfig() { +// +// this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig", +// CommandReaderConfig.class); +// } +// +// private void getRedisStreamListenerContainer() { +// +// this.streamMessageListenerContainer = SpringUtils.getBean( +// EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER, +// StreamMessageListenerContainer.class +// ); +// } +// +// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { +// registerStreamReader(redisStreamListenerContainerBeanName, +// streamKey, +// null); +// } +// +// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList ExecutionResult) { +// +// // prepare the environment +// prepareEnv(); +// +// // oldStreamKey equals streamKey don't need to do anything , just return +// if (redisStreamReaderConfig.getStreamKey() +// .equals(streamKey)) { +// log.debug("redis listener container not change !"); +// return; +// } +// +// // destroy the old REDIS_STREAM_LISTENER_CONTAINER +// destroyStreamReader(streamKey); +// +// // modify the configuration ==> streamKey +// modifyStreamReader(streamKey, +// ExecutionResult); +// +// // re-create the REDIS_STREAM_LISTENER_CONTAINER +// createStreamReader(redisStreamListenerContainerBeanName, +// streamKey); +// +// } +// +// private void prepareEnv() { +// +// getRedisStreamConfig(); +// +// } +// +// private void getRedisStreamConfig() { +// +// this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", +// RedisStreamReaderConfig.class); +// } +// +// +// private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { +// +// log.debug("start to create the redis stream listener container"); +// // create the lazy bean +// +// StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, +// StreamMessageListenerContainer.class); +// +// REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, +// streamMessageListenerContainer); +// +// // very important +// log.debug("start the listener container"); +// streamMessageListenerContainer.start(); +// +// +// } +// +// private void modifyStreamReader(String streamKey, ArrayList executionResult) { +// +// log.debug("start to modify the redis stream listener container stream key"); +// String oldStreamKey = redisStreamReaderConfig.getStreamKey(); +// +// log.debug("change stream key from [{}] to [{}]", +// oldStreamKey, +// streamKey); +// +// log.debug("start to set the Redis Stream Reader key"); +// redisStreamReaderConfig.setStreamKey(streamKey); +// +// log.debug("start to set the Redis Stream Execution Result Container"); +// redisStreamReaderConfig.setExecutionResult(executionResult); +// +// } +// +// +// private void destroyStreamReader(String streamKey) { +// +// String oldStreamKey = redisStreamReaderConfig.getStreamKey(); +// +// if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { +// +// StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); +// +// log.debug("destroyed old redis stream listener container is [ {} ]", +// streamMessageListenerContainer); +// +// +// // double destroy +// SpringUtils.destroyBean(streamMessageListenerContainer); +// streamMessageListenerContainer.stop(); +// // help gc +// streamMessageListenerContainer = null; +// } +// +// +// } +//} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java index a57da80..8d51e9c 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java @@ -1,121 +1,121 @@ -package io.wdd.rpc.execute.result; - - -import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Scope; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; - -import javax.annotation.Resource; -import java.time.Duration; -import java.util.ArrayList; - -@Configuration -@Slf4j -@Getter -@Setter -public class RedisStreamReaderConfig { - - @Resource - private RedisConnectionFactory redisConnectionFactory; - - public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer"; - - public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer"; - - public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer"; - - public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer"; - - /** - * used in old model - */ - private String streamKey = "cccc"; - - /** - * no use - */ - private ArrayList executionResult = null; - - - @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER) - @Lazy - public StreamMessageListenerContainer> executionResultRedisStreamListenerContainer(){ - - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - .pollTimeout(Duration.ofSeconds(2)) - .build(); - - StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); - - return listenerContainer; - } - - - @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) - @Scope("prototype") - @Lazy - public StreamMessageListenerContainer> commandResultRedisStreamListenerContainer(){ - - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - .pollTimeout(Duration.ofSeconds(2)) - .build(); - - StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); - - // todo 此部分可以被移出到另外的位置,会更加方便,就不需要对此Bean进行创建和销毁了 - listenerContainer.receive( - - StreamOffset.create(streamKey, ReadOffset.lastConsumed()), - - new CommandResultReader( - REDIS_STREAM_LISTENER_CONSUMER_NAME, - streamKey, - REDIS_STREAM_LISTENER_CONSUMER_NAME, - executionResult - ) - - ); - - return listenerContainer; - } - - @Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER) - @Scope("prototype") - @Lazy - public StreamMessageListenerContainer> agentStatusRedisStreamListenerContainer(){ - - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions - .builder() - .pollTimeout(Duration.ofSeconds(2)) - .build(); - - StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); - - listenerContainer.receive( - - StreamOffset.create(streamKey, ReadOffset.lastConsumed()), - - new AgentStatusStreamReader( - REDIS_STREAM_LISTENER_CONSUMER_NAME, - REDIS_STREAM_LISTENER_CONSUMER_NAME, - REDIS_STREAM_LISTENER_CONSUMER_NAME) - - ); - - return listenerContainer; - } - - -} +//package io.wdd.rpc.execute.result; +// +// +//import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader; +//import lombok.Getter; +//import lombok.Setter; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.context.annotation.Lazy; +//import org.springframework.context.annotation.Scope; +//import org.springframework.data.redis.connection.RedisConnectionFactory; +//import org.springframework.data.redis.connection.stream.MapRecord; +//import org.springframework.data.redis.connection.stream.ReadOffset; +//import org.springframework.data.redis.connection.stream.StreamOffset; +//import org.springframework.data.redis.stream.StreamMessageListenerContainer; +// +//import javax.annotation.Resource; +//import java.time.Duration; +//import java.util.ArrayList; +// +//@Configuration +//@Slf4j +//@Getter +//@Setter +//public class RedisStreamReaderConfig { +// +// @Resource +// private RedisConnectionFactory redisConnectionFactory; +// +// public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer"; +// +// public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer"; +// +// public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer"; +// +// public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer"; +// +// /** +// * used in old model +// */ +// private String streamKey = "cccc"; +// +// /** +// * no use +// */ +// private ArrayList executionResult = null; +// +// +// @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER) +// @Lazy +// public StreamMessageListenerContainer> executionResultRedisStreamListenerContainer(){ +// +// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions +// .builder() +// .pollTimeout(Duration.ofSeconds(2)) +// .build(); +// +// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); +// +// return listenerContainer; +// } +// +// +// @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) +// @Scope("prototype") +// @Lazy +// public StreamMessageListenerContainer> commandResultRedisStreamListenerContainer(){ +// +// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions +// .builder() +// .pollTimeout(Duration.ofSeconds(2)) +// .build(); +// +// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); +// +// // todo 此部分可以被移出到另外的位置,会更加方便,就不需要对此Bean进行创建和销毁了 +// listenerContainer.receive( +// +// StreamOffset.create(streamKey, ReadOffset.lastConsumed()), +// +// new CommandResultReader( +// REDIS_STREAM_LISTENER_CONSUMER_NAME, +// streamKey, +// REDIS_STREAM_LISTENER_CONSUMER_NAME, +// executionResult +// ) +// +// ); +// +// return listenerContainer; +// } +// +// @Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER) +// @Scope("prototype") +// @Lazy +// public StreamMessageListenerContainer> agentStatusRedisStreamListenerContainer(){ +// +// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions +// .builder() +// .pollTimeout(Duration.ofSeconds(2)) +// .build(); +// +// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); +// +// listenerContainer.receive( +// +// StreamOffset.create(streamKey, ReadOffset.lastConsumed()), +// +// new AgentStatusStreamReader( +// REDIS_STREAM_LISTENER_CONSUMER_NAME, +// REDIS_STREAM_LISTENER_CONSUMER_NAME, +// REDIS_STREAM_LISTENER_CONSUMER_NAME) +// +// ); +// +// return listenerContainer; +// } +// +// +//} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java index b0cba48..3ff7089 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java @@ -1,203 +1,203 @@ -package io.wdd.rpc.execute.service; - - -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.execute.config.CommandReaderConfig; -import io.wdd.rpc.execute.config.ExecutionLog; -import io.wdd.rpc.execute.result.BuildStreamReader; -import io.wdd.server.service.ExecutionLogService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.springframework.context.annotation.Lazy; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.*; - -/** - * 1. [waiting strategy ] - * 2. [build the redis stream listener] - * 3. [call persistence] - */ -//@Service -@Slf4j -@Lazy -@Deprecated -public class ExecutionResultDaemonHandler { - - /** - * store all execution result key - *

- * which means there are execution running , waiting for their result to handle - */ - public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); - private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70; - - @Resource - BuildStreamReader buildStreamReader; - - @Resource - CommandReaderConfig commandReaderConfig; - - @Resource - ExecutionLogService executionLogService; - - @PostConstruct - public void startExecutionDaemonHandler() { - - // 启动一个异步线程,运行 Execution结果处理守护进程 - CompletableFuture.runAsync( - () -> realStartExecutionDaemonHandler() - ); - - } - - private void realStartExecutionDaemonHandler() { - - while (true) { - - while (WAIT_EXECUTION_RESULT_LIST.size() == 0) { - try { - // no execution result need to handle - - // wait for 5 seconds - log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !"); - TimeUnit.SECONDS.sleep(5); - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - // has result to handle , just handle one result at one time - String resultKey = WAIT_EXECUTION_RESULT_LIST - .keys() - .nextElement(); - - log.debug( - "current result key is [{}]", - resultKey - ); - - - CompletableFuture> executionResultFuture = - CompletableFuture - .supplyAsync( - () -> { - // 修改相应的参数 - commandReaderConfig.setStreamKey(resultKey); - // listener container 实际上是根据这个绑定的 - commandReaderConfig.setGroup(resultKey); - // 必须归零 - commandReaderConfig.setExecutionResult(null); - - // 构造 resultKey对应的 Redis Stream Listener Container - buildStreamReader - .buildStreamReader(commandReaderConfig); - - // 获得结果 - ArrayList s = new ArrayList<>( - List.of("no no no") - ); - - try { - s = CompletableFuture - .supplyAsync( - () -> { - while (true) { - // todo 多条命令时,这里只能获取到一个结果 - if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) { - return commandReaderConfig.getExecutionResult(); - } - - try { - TimeUnit.SECONDS.sleep(3); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - ) - // 获取相应的结果 - .get( - MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - throw new RuntimeException(e); - } - - - return s; - } - ); - - CompletableFuture> falloutTimeFuture = CompletableFuture.supplyAsync( - () -> { - try { - TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - return null; - } - ); - - // 获取结果,然后销毁Stream Listener Container - CompletableFuture complete = CompletableFuture - .anyOf( - falloutTimeFuture, - executionResultFuture - ); - - complete - .whenComplete( - (result, e) -> { - - log.debug( - "execution result are => {}", - result - ); - - // 持久化存储对应的结果 - ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey); - executionLog.setAcTime(TimeUtils.currentTime()); - executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult())); - executionLog.setResultCode( - CollectionUtils.isEmpty((Collection) result) ? 1 : 0 - ); - executionLog.setRecordId(commandReaderConfig.getRecordId()); - - - // 保存操作 - executionLogService.save(executionLog); - - // 清除此次任务的内容 - WAIT_EXECUTION_RESULT_LIST.remove(resultKey); - log.info( - "[Execution] - command {} result are {} result code is {} ,whole process are complete !", - executionLog.getCommandList(), - executionLog.getResultContent(), - executionLog.getResultCode() - ); - } - ); - - // very important - // stuck the main thread , otherwise it will create a dead loop - complete.join(); - - } - - } - - -} +//package io.wdd.rpc.execute.service; +// +// +//import io.wdd.common.utils.TimeUtils; +//import io.wdd.rpc.execute.config.CommandReaderConfig; +//import io.wdd.rpc.execute.config.ExecutionLog; +//import io.wdd.rpc.execute.result.BuildStreamReader; +//import io.wdd.server.service.ExecutionLogService; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.commons.collections.CollectionUtils; +//import org.springframework.context.annotation.Lazy; +// +//import javax.annotation.PostConstruct; +//import javax.annotation.Resource; +//import java.util.ArrayList; +//import java.util.Collection; +//import java.util.List; +//import java.util.concurrent.*; +// +///** +// * 1. [waiting strategy ] +// * 2. [build the redis stream listener] +// * 3. [call persistence] +// */ +////@Service +//@Slf4j +//@Lazy +//@Deprecated +//public class ExecutionResultDaemonHandler { +// +// /** +// * store all execution result key +// *

+// * which means there are execution running , waiting for their result to handle +// */ +// public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); +// private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70; +// +// @Resource +// BuildStreamReader buildStreamReader; +// +// @Resource +// CommandReaderConfig commandReaderConfig; +// +// @Resource +// ExecutionLogService executionLogService; +// +// @PostConstruct +// public void startExecutionDaemonHandler() { +// +// // 启动一个异步线程,运行 Execution结果处理守护进程 +// CompletableFuture.runAsync( +// () -> realStartExecutionDaemonHandler() +// ); +// +// } +// +// private void realStartExecutionDaemonHandler() { +// +// while (true) { +// +// while (WAIT_EXECUTION_RESULT_LIST.size() == 0) { +// try { +// // no execution result need to handle +// +// // wait for 5 seconds +// log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !"); +// TimeUnit.SECONDS.sleep(5); +// +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// +// // has result to handle , just handle one result at one time +// String resultKey = WAIT_EXECUTION_RESULT_LIST +// .keys() +// .nextElement(); +// +// log.debug( +// "current result key is [{}]", +// resultKey +// ); +// +// +// CompletableFuture> executionResultFuture = +// CompletableFuture +// .supplyAsync( +// () -> { +// // 修改相应的参数 +// commandReaderConfig.setStreamKey(resultKey); +// // listener container 实际上是根据这个绑定的 +// commandReaderConfig.setGroup(resultKey); +// // 必须归零 +// commandReaderConfig.setExecutionResult(null); +// +// // 构造 resultKey对应的 Redis Stream Listener Container +// buildStreamReader +// .buildStreamReader(commandReaderConfig); +// +// // 获得结果 +// ArrayList s = new ArrayList<>( +// List.of("no no no") +// ); +// +// try { +// s = CompletableFuture +// .supplyAsync( +// () -> { +// while (true) { +// // todo 多条命令时,这里只能获取到一个结果 +// if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) { +// return commandReaderConfig.getExecutionResult(); +// } +// +// try { +// TimeUnit.SECONDS.sleep(3); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// } +// ) +// // 获取相应的结果 +// .get( +// MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, +// TimeUnit.SECONDS +// ); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } catch (ExecutionException e) { +// throw new RuntimeException(e); +// } catch (TimeoutException e) { +// throw new RuntimeException(e); +// } +// +// +// return s; +// } +// ); +// +// CompletableFuture> falloutTimeFuture = CompletableFuture.supplyAsync( +// () -> { +// try { +// TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// +// return null; +// } +// ); +// +// // 获取结果,然后销毁Stream Listener Container +// CompletableFuture complete = CompletableFuture +// .anyOf( +// falloutTimeFuture, +// executionResultFuture +// ); +// +// complete +// .whenComplete( +// (result, e) -> { +// +// log.debug( +// "execution result are => {}", +// result +// ); +// +// // 持久化存储对应的结果 +// ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey); +// executionLog.setAcTime(TimeUtils.currentTime()); +// executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult())); +// executionLog.setResultCode( +// CollectionUtils.isEmpty((Collection) result) ? 1 : 0 +// ); +// executionLog.setRecordId(commandReaderConfig.getRecordId()); +// +// +// // 保存操作 +// executionLogService.save(executionLog); +// +// // 清除此次任务的内容 +// WAIT_EXECUTION_RESULT_LIST.remove(resultKey); +// log.info( +// "[Execution] - command {} result are {} result code is {} ,whole process are complete !", +// executionLog.getCommandList(), +// executionLog.getResultContent(), +// executionLog.getResultCode() +// ); +// } +// ); +// +// // very important +// // stuck the main thread , otherwise it will create a dead loop +// complete.join(); +// +// } +// +// } +// +// +//}