> commandListComplete, boolean needResultReplay, boolean durationTask) {
-
- return ExecutionMessage
- .builder()
- .resultKey(resultKey)
- .type(type)
- .singleLineCommand(commandList)
- .multiLineCommand(commandListComplete)
- .needResultReplay(needResultReplay)
- .durationTask(durationTask)
- .build();
- }
-
-
}
diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java
index 1295831..3ff7089 100644
--- a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java
+++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java
@@ -1,204 +1,203 @@
-package io.wdd.rpc.execute.service;
-
-
-import io.wdd.common.utils.TimeUtils;
-import io.wdd.rpc.execute.config.CommandReaderConfig;
-import io.wdd.rpc.execute.config.ExecutionLog;
-import io.wdd.rpc.execute.result.BuildStreamReader;
-import io.wdd.server.service.ExecutionLogService;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.springframework.context.annotation.Lazy;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.*;
-
-/**
- * 1. [waiting strategy ]
- * 2. [build the redis stream listener]
- * 3. [call persistence]
- */
-//@Service
-@Slf4j
-@Lazy
-@Deprecated
-public class ExecutionResultDaemonHandler {
-
- /**
- * store all execution result key
- *
- * which means there are execution running , waiting for their result to handle
- */
- public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
- private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
-
- @Resource
- BuildStreamReader buildStreamReader;
-
- @Resource
- CommandReaderConfig commandReaderConfig;
-
- @Resource
- ExecutionLogService executionLogService;
-
- @PostConstruct
- public void startExecutionDaemonHandler() {
-
- // 启动一个异步线程,运行 Execution结果处理守护进程
- CompletableFuture.runAsync(
- () -> realStartExecutionDaemonHandler()
- );
-
- }
-
- private void realStartExecutionDaemonHandler() {
-
- while (true) {
-
- while (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
- try {
- // no execution result need to handle
-
- // wait for 5 seconds
- log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !");
- TimeUnit.SECONDS.sleep(5);
-
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- // has result to handle , just handle one result at one time
- String resultKey = WAIT_EXECUTION_RESULT_LIST
- .keys()
- .nextElement();
-
- log.debug(
- "current result key is [{}]",
- resultKey
- );
-
-
- CompletableFuture> executionResultFuture =
- CompletableFuture
- .supplyAsync(
- () -> {
- // 修改相应的参数
- commandReaderConfig.setStreamKey(resultKey);
- // listener container 实际上是根据这个绑定的
- commandReaderConfig.setGroup(resultKey);
- // 必须归零
- commandReaderConfig.setExecutionResult(null);
-
- // 构造 resultKey对应的 Redis Stream Listener Container
- buildStreamReader
- .buildStreamReader(commandReaderConfig);
-
- // 获得结果
- ArrayList s = new ArrayList<>(
- List.of("no no no")
- );
-
- try {
- s = CompletableFuture
- .supplyAsync(
- () -> {
- while (true) {
- // todo 多条命令时,这里只能获取到一个结果
- if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) {
- return commandReaderConfig.getExecutionResult();
- }
-
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
- )
- // 获取相应的结果
- .get(
- MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
- TimeUnit.SECONDS
- );
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
- }
-
-
- return s;
- }
- );
-
- CompletableFuture> falloutTimeFuture = CompletableFuture.supplyAsync(
- () -> {
- try {
- TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- return null;
- }
- );
-
- // 获取结果,然后销毁Stream Listener Container
- CompletableFuture