diff --git a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java index 4806b74..d7a5da4 100644 --- a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java +++ b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java @@ -129,7 +129,9 @@ public class AgentRebootUpdateService { // 最终执行关机操作 commandExecutor.execute( streamKey, - shutdownCommand + shutdownCommand, + false, + false ); } } diff --git a/agent/src/main/java/io/wdd/agent/config/message/config/OctopusRabbitMQAdminConfig.java b/agent/src/main/java/io/wdd/agent/config/message/OctopusRabbitMQAdminConfig.java similarity index 92% rename from agent/src/main/java/io/wdd/agent/config/message/config/OctopusRabbitMQAdminConfig.java rename to agent/src/main/java/io/wdd/agent/config/message/OctopusRabbitMQAdminConfig.java index d5cdc26..c82d3fb 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/config/OctopusRabbitMQAdminConfig.java +++ b/agent/src/main/java/io/wdd/agent/config/message/OctopusRabbitMQAdminConfig.java @@ -1,4 +1,4 @@ -package io.wdd.agent.config.message.config; +package io.wdd.agent.config.message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java index 019aa01..3310c36 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java @@ -49,7 +49,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { // add in 2023-1-17 - if (CollectionUtils.isNotEmpty(executionMessage.getScriptCommandList())) { + if (CollectionUtils.isNotEmpty(executionMessage.getMultiLineCommand())) { // 传递的是 页面定时任务脚本 functionExecutor.execute(executionMessage, true); return true; diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index e73f5ce..3165683 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -3,7 +3,7 @@ package io.wdd.agent.executor; import com.google.common.io.ByteStreams; import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.executor.redis.StreamSender; -import io.wdd.agent.executor.thread.LogToArrayListCache; +import io.wdd.agent.executor.thread.CommandExecLogCache; import io.wdd.common.beans.executor.ExecutionMessage; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -15,8 +15,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,12 +27,10 @@ public class CommandExecutor { StreamSender streamSender; @Resource - LogToArrayListCache logToArrayListCache; + CommandExecLogCache commandExecLogCache; int processMaxWaitSeconds = 60; - ExecutorService DaemonCommandProcess = Executors.newFixedThreadPool(1); - /** * handle command from octopus server * @@ -46,43 +43,52 @@ public class CommandExecutor { AgentCommonThreadPool.pool.submit( () -> this.execute( executionMessage.getResultKey(), - executionMessage.getCommandList() + executionMessage.getSingleLineCommand(), + executionMessage.isNeedResultReplay(), + executionMessage.isDurationTask() ) ); } - public int execute(String streamKey, List command) { + public int execute(String streamKey, List command, boolean needResultReplay, boolean durationTask) { ProcessBuilder processBuilder = new ProcessBuilder(command); return this.processExecute( streamKey, - processBuilder + processBuilder, + needResultReplay, + durationTask ); } - public int execute(String streamKey, String... command) { + public int execute(String streamKey, boolean needResultReplay, boolean durationTask, String... command) { ProcessBuilder processBuilder = new ProcessBuilder(command); return this.processExecute( streamKey, - processBuilder + processBuilder, + needResultReplay, + durationTask ); } - private int processExecute(String streamKey, ProcessBuilder processBuilder) { + /** + * 执行最底层命令行操作的代码 + */ + private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask) { + // 重定向,错误日志到标准输出中 processBuilder.redirectErrorStream(true); - //processBuilder.inheritIO(); - //processBuilder.directory(new File(System.getProperty("user.home"))); + // processBuilder.inheritIO(); + // processBuilder.directory(new File(System.getProperty("user.home"))); int processResult = 233; try { - // 开始执行命令之前,需要进行打印 log.debug( "current shell command {}", @@ -90,19 +96,25 @@ public class CommandExecutor { ); // 开始执行命令操作 + CountDownLatch countDownLatch = new CountDownLatch(1); + Process process = processBuilder.start(); // start a backend thread to daemon the process // wait for processMaxWaitSeconds and kill the process if it's still alived - DaemonCommandProcess.submit( - StopStuckCommandProcess( - process, - processMaxWaitSeconds - )); - + if (!durationTask) { + // 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制 + // 2023年2月23日 同时执行log日志部分移动至此部分处理 + AgentCommonThreadPool.pool.submit( + StopStuckCommandProcess( + process, + processMaxWaitSeconds, + countDownLatch + )); + } // 缓存让命令处理日志,并且打印 - logToArrayListCache.cacheLog( + commandExecLogCache.cacheLog( streamKey, process ); @@ -110,21 +122,21 @@ public class CommandExecutor { // start to send the result log //streamSender.startToWaitLog(streamKey); - // todo this will stuck the process and rabbitmq message will reentry the queue // get the command result must also be a timeout smaller than the process - boolean waitFor = process.waitFor( - 50, - TimeUnit.SECONDS - ); - - + int waitFor = process.waitFor(); // end send logs //streamSender.endWaitLog(streamKey); // get the process result if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) { + + // 命令执行完成, countDownLatch计数 + countDownLatch.countDown(); + + // 设置 命令执行退出返回值 processResult = process.exitValue(); + } log.debug( @@ -133,7 +145,6 @@ public class CommandExecutor { processResult ); - } catch (IOException | InterruptedException e) { log.error( "Shell command error ! {} + {}", @@ -145,38 +156,53 @@ public class CommandExecutor { return processResult; } - private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds) { + private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch) { return () -> { - try { + boolean commandExecComplete = false; + + try { log.debug( - "daemon thread start to wait for {} s for the result", + "命令执行守护进程开始等待 {} s for the result", processMaxWaitSeconds ); - TimeUnit.SECONDS.sleep(processMaxWaitSeconds); + // 使用 countDownLatch 进行超时等待 + commandExecComplete = countDownLatch.await( + processMaxWaitSeconds, + TimeUnit.SECONDS + ); - if (process.isAlive()) { + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + // 任务提前执行结束,或者超过了最长等待时间 + // 判断命令是否正确处理完成 + if (!commandExecComplete) { log.warn( - "Command [ {} ] stuck for {} s, destroy the command process !", + "Command [ {} ] stuck for {} s, destroy the command process!", process .info() .commandLine() .get(), processMaxWaitSeconds ); - - // shutdown the process - process.destroyForcibly(); } - } catch (InterruptedException e) { - throw new RuntimeException(e); + // 只有当该进程还存活,执行关闭操作 + if (process.isAlive()) { + // shutdown the process + process.destroyForcibly(); + + } + + } }; } + @Deprecated private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { byte[] toByteArray = ByteStreams.toByteArray(inputStream); @@ -204,8 +230,8 @@ public class CommandExecutor { TimeUnit.SECONDS.sleep(1); // clear the log Cache Thread scope - logToArrayListCache - .getExecutionCmdCachedLogArrayList(streamKey) + commandExecLogCache + .getCommandExecLogCacheArrayList(streamKey) .clear(); // clear the stream sender diff --git a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java index dd521fb..38d91c4 100644 --- a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java @@ -29,39 +29,50 @@ public class FunctionExecutor { ExecutorFunctionNacosCollector executorFunctionNacosCollector; - - public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) { // 拿到 resultKey String resultKey = executionMessage.getResultKey(); - List> completeCommandList; + List> multiLineCommand; if (isScheduledScript) { // 检测到是 页面定时任务脚本 - completeCommandList = executionMessage.getScriptCommandList(); + multiLineCommand = executionMessage.getMultiLineCommand(); } else { // 从 ALL_FUNCTION_MAP本地容器中(Nacos配置中获取)获取到 功能脚本的内容 - completeCommandList = ALL_FUNCTION_MAP.get(executionMessage.getType()); + multiLineCommand = ALL_FUNCTION_MAP.get(executionMessage.getType()); } // 防止阻塞消息队列中的其他信息,需要使用异步执行 AgentCommonThreadPool.pool.submit( - () -> this.execute(resultKey, completeCommandList) + () -> this.doExecuteFunction( + resultKey, + multiLineCommand, + executionMessage.isNeedResultReplay(), + executionMessage.isDurationTask() + ) ); } - private void execute(String streamKey, List> completeCommandList) { + private void doExecuteFunction(String streamKey, List> multiLineCommand, boolean needResultReplay, boolean durationTask) { + log.info( + "[ Function Executor ] all commands are ==> {}", + multiLineCommand + ); - log.info("[ Function Executor ] all commands are ==> {}", completeCommandList); - - Iterator> iterator = completeCommandList.iterator(); + Iterator> iterator = multiLineCommand.iterator(); + // 此处的策略为,将所有的命令是为一个整体,必须完整执行才可,完整返回结果 while (iterator.hasNext()) { - int execute = commandExecutor.execute(streamKey, iterator.next()); + int execute = commandExecutor.execute( + streamKey, + iterator.next(), + needResultReplay, + durationTask + ); if (execute != 0) { log.error("command list execute failed !"); @@ -69,30 +80,38 @@ public class FunctionExecutor { } } + // 清除命令行的执行日志缓存 commandExecutor.clearCommandCache(streamKey); } @Bean - private void daemonListenToNacosFunctions(){ + private void daemonListenToNacosFunctions() { // add listener to listen to the real-time change of the Function Shell Scripts try { - NacosConfigService.addListener(executorFunctionNacosCollector.executorFunctionDataId + "." + executorFunctionNacosCollector.fileExtension, executorFunctionNacosCollector.group, new Listener() { - @Override - public Executor getExecutor() { - return null; - } + NacosConfigService.addListener( + executorFunctionNacosCollector.executorFunctionDataId + "." + executorFunctionNacosCollector.fileExtension, + executorFunctionNacosCollector.group, + new Listener() { + @Override + public Executor getExecutor() { + return null; + } - @Override - public void receiveConfigInfo(String s) { + @Override + public void receiveConfigInfo(String s) { - log.info("detected nacos function shell update ! {}", s); + log.info( + "detected nacos function shell update ! {}", + s + ); - executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s); + executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s); - } - }); + } + } + ); } catch (NacosException e) { throw new RuntimeException(e); diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index aad6fed..aca246b 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.agent.config.beans.executor.CommandLog; import io.wdd.agent.config.beans.executor.StreamSenderEntity; import io.wdd.common.utils.TimeUtils; -import io.wdd.agent.executor.thread.LogToArrayListCache; +import io.wdd.agent.executor.thread.CommandExecLogCache; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.BeanUtils; @@ -39,7 +39,7 @@ public class StreamSender { ObjectMapper objectMapper; @Resource - LogToArrayListCache logToArrayListCache; + CommandExecLogCache commandExecLogCache; private final HashMap AllNeededStreamSender = new HashMap<>(16); @@ -63,7 +63,7 @@ public class StreamSender { StreamSenderEntity streamSender = StreamSenderEntity .builder() - .cachedCommandLog(logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey)) + .cachedCommandLog(commandExecLogCache.getCommandExecLogCacheArrayList(streamKey)) .waitToSendLog(true) .startIndex(0) .streamKey(streamKey) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java similarity index 61% rename from agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java rename to agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 817e05f..5fd844c 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -9,32 +9,36 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; /** * utils to cache store the command execution logs + * 2023年2月22日 同样需要取出存着的日志 */ @Component @Slf4j -public class LogToArrayListCache { +public class CommandExecLogCache { - // concurrent command execute logs - public static List> CachedCommandLog = new ArrayList<>( - List.of( - new ArrayList<>(256), - new ArrayList<>(256), - new ArrayList<>(256), - new ArrayList<>(256), - new ArrayList<>(256) - ) - ); + // 存储命令执行缓存日志 + public static HashMap> CachedCommandLog = new HashMap<>(); + /** + * 存储命令执行为空的默认空结果 + */ + private static ArrayList EmptyCommandLog = new ArrayList<>(); + + /** + * 缓存命令执行日志 + * + * @param streamKey 根据 命令执行结果Key 定位缓存队列 + * @param process 植物执行队列 + */ public void cacheLog(String streamKey, Process process) { - ArrayList commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); + ArrayList commandCachedLog = new ArrayList<>(128); - String format = String.format( + String execCommandString = String.format( "execution command are => [ %s ]", process .info() @@ -44,21 +48,47 @@ public class LogToArrayListCache { // add the command commandCachedLog.add(""); - commandCachedLog.add(format); + commandCachedLog.add(execCommandString); commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add(""); // cache the real command logs doCacheLog( streamKey, - process.getInputStream() + process.getInputStream(), + commandCachedLog ); } - private void doCacheLog(String streamKey, InputStream commandLogStream) { - ArrayList commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); + /** + * 获取缓存的 命令执行缓存日志 + * + * @param streamKey + * @return + */ + public ArrayList getCacheLog(String streamKey) { + + // 获取 + ArrayList execLogCacheArrayList = CachedCommandLog.getOrDefault(streamKey,EmptyCommandLog); + + // 清除Key + CachedCommandLog.remove(streamKey); + + + return execLogCacheArrayList; + } + + /** + * 实际执行命令缓存操作 + * + * @param streamKey + * @param commandLogStream + * @param commandCachedLog + */ + private void doCacheLog(String streamKey, InputStream commandLogStream, ArrayList commandCachedLog) { + // read from input stream and store to the cacheArrayList new BufferedReader(new InputStreamReader(commandLogStream)) @@ -74,7 +104,14 @@ public class LogToArrayListCache { ); } - public ArrayList getExecutionCmdCachedLogArrayList(String streamKey) { + /** + * 获取到命令执行日志 缓存存储实际的ArrayList + * + * @param streamKey + * @return + */ + @Deprecated + public ArrayList getCommandExecLogCacheArrayList(String streamKey) { int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey); @@ -82,15 +119,23 @@ public class LogToArrayListCache { } + /** + * 根据返回的ResultKey计算出来,缓存的位置 + * + * @param streamKey + * @return + */ private int hashStreamKeyToCachedArrayListIndex(String streamKey) { int size = CachedCommandLog.size(); return Math.abs(streamKey.hashCode() % size); } + @Deprecated private int hashStreamKeyToCachedArrayListIndexWithProblem(String streamKey) { - int size = CachedCommandLog.size(); + return 0; + /*int size = CachedCommandLog.size(); int result = Math.abs(streamKey.hashCode() % size); boolean hasRehashed = false; @@ -123,7 +168,7 @@ public class LogToArrayListCache { } } - return result; + return result;*/ } } diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index 23eaed8..85793ac 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -25,7 +25,7 @@ public class TestCommandExecutorController { FunctionExecutor functionExecutor; - @PostMapping("comand") + /*@PostMapping("comand") public R testFor( @RequestParam(value = "streamKey") String streamKey, @RequestParam(value = "command") List command @@ -33,7 +33,7 @@ public class TestCommandExecutorController { commandExecutor.execute(streamKey, command); return R.ok(streamKey); - } + }*/ @PostMapping("linuxFile") @@ -45,7 +45,7 @@ public class TestCommandExecutorController { ExecutionMessage executionMessage = ExecutionMessage.builder() .resultKey(streamKey) .type(messageType) - .commandList(Collections.singletonList(messageType)) + .singleLineCommand(Collections.singletonList(messageType)) .build(); System.out.println("executionMessage = " + executionMessage); diff --git a/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java b/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java index b913c2e..7182525 100644 --- a/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java +++ b/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java @@ -19,6 +19,8 @@ public class InitRabbitMQTest { String format = String.format("C:\\program files\\powershell\\7\\pwsh.exe /c dir %s | findstr \"Desktop\"", homeDirectory); commandExecutor.execute("sasda", + false, + false, "C:\\program files\\powershell\\7\\pwsh.exe", "pwd"); diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java index 3459ce4..74848f4 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java @@ -1,5 +1,6 @@ package io.wdd.common.beans.executor; +import com.fasterxml.jackson.annotation.JsonProperty; import io.wdd.common.utils.TimeUtils; import lombok.AllArgsConstructor; import lombok.Data; @@ -15,26 +16,59 @@ import java.util.List; @SuperBuilder(toBuilder = true) public class ExecutionMessage { + /** + * 2023年2月22日 + * 是否需要返回 命令行的处理调用结果 + * 通过 MQ返回 + */ + @JsonProperty(defaultValue = "false") + boolean needResultReplay; + + /** + * 2023年2月22日 + * 是否是长时间持续执行任务 + */ + @JsonProperty(defaultValue = "false") + boolean durationTask; + /** * 用于区分 ExecutionMessage的类型 * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot */ private String type; - private List commandList; - + /** + * 只有一行的命令行 + */ + private List singleLineCommand; /** * add in 2023-1-17 * 页面定时脚本任务 需要传递完整的命令列表 */ - private List> scriptCommandList; + private List> multiLineCommand; + /** + * 词条执行命令的返回结果在Redis中的ResultKey + */ private String resultKey; + /** + * 生成 Command结果的 resultKey + * + * @param topicName + * @return + */ public static String GetResultKey(String topicName) { return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit(); } + /** + * 延迟执行任务,执行的Key为未来的,生成这个和Key + * + * @param topicName + * @param futureExecutionTime + * @return + */ public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) { return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime); } diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java index 75b7b27..e51dc95 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java @@ -9,6 +9,7 @@ import lombok.experimental.SuperBuilder; @NoArgsConstructor @AllArgsConstructor @SuperBuilder(toBuilder = true) +@Deprecated public class ExecutorFunctionMessage { String functionName; diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java index 58516c7..100e84b 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java @@ -10,40 +10,43 @@ public interface CoreExecutionService { String SendCommandToAgent(String agentTopicName, List commandList); - /** - * 调用 单行命令脚本的 最底层函数 - * - * @param agentTopicName agent唯一表示名 - * @param type 任务执行类型 - * @param commandList 任务列表内容 - * @return redis中的 result key - */ + String SendCommandToAgent(String agentTopicName, String type, List commandList); - String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); - - /** - * - * 调用 完整脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param commandList - * @param commandListComplete - * @param futureKey - * @return resultKey 本次操作在Redis中记录的结果Key - */ - String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); - - List SendCommandToAgent(List agentTopicNameList, String type, List command); /** - * 通常为 页面定时脚本任务调用 + * 调用 单行命令脚本的 最底层函数 * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 + * @param agentTopicName + * @param type + * @param commandList + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + String SendCommandToAgent( + String agentTopicName, + String type, + List commandList, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + + /** ------------------------------------------------- */ + + + String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 * @param completeCommandList 完整的类型 * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey */ @@ -51,24 +54,39 @@ public interface CoreExecutionService { /** - * 通常为 页面定时脚本任务调用 + * 通常为 页面定时脚本任务调用 * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 * @param completeCommandList 完整的类型 - * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey + * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey */ List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + /** - * 向所有的Agent发送命令 + * 调用 完整脚本的 最底层函数 + * + * @param agentTopicName * @param type * @param commandList - * @return - + * @param commandListComplete + * @param futureKey + * @param durationTask + * @return resultKey 本次操作在Redis中记录的结果Key */ - @Deprecated - List SendCommandToAgentAll(String type, List commandList); + String SendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 4611d96..8e4b928 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -61,11 +61,27 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { agentTopicName, type, commandList, - null + false, + null, + false ); } + @Override + public String SendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + + return this.SendCommandToAgent( + agentTopicName, + type, + commandList, + null, + needResultReplay, + futureKey, + durationTask + ); + } + @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { @@ -81,6 +97,21 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + return this.SendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + false, + futureKey, + false + ); + + } + + @Override + public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error( @@ -92,7 +123,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { } // 归一化type类型 不行 - String resultKey = futureKey; // 判定是否是 FutureKey if (null == futureKey) { @@ -105,7 +135,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { type, commandList, resultKey, - commandListComplete + commandListComplete, + needResultReplay, + durationTask ); OctopusMessage octopusMessage = this.generateOctopusMessage( agentTopicName, @@ -122,6 +154,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { resultKey, resultKey ); + log.debug( "set consumer group [{}] for the stream key with => [ {} ]", group, @@ -132,10 +165,10 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // construct the persistent Bean - ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( + /*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( octopusMessage, executionMessage - ); + );*/ // send resultKey to ExecutionResultDaemonHandler // 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效 /*WAIT_EXECUTION_RESULT_LIST.put( @@ -148,6 +181,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { octopusMessage = null; return resultKey; + } private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { @@ -174,7 +208,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { ExecutionLog executionLog = new ExecutionLog(); executionLog.setAgentTopicName(octopusMessage.getUuid()); executionLog.setResultKey((String) octopusMessage.getContent()); - executionLog.setCommandList(String.valueOf(executionMessage.getCommandList())); + executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand())); executionLog.setType(executionMessage.getType()); executionLog.setResultKey(executionMessage.getResultKey()); return executionLog; @@ -238,12 +272,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { .collect(Collectors.toList()); } - @Override - public List SendCommandToAgentAll(String type, List commandList) { - - return null; - } - @Deprecated private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { @@ -253,7 +281,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { type, commandList, resultKey, - commandListComplete + commandListComplete, + false, + false ); String executionMessageString; @@ -274,14 +304,16 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { .build(); } - private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete) { + private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { return ExecutionMessage .builder() .resultKey(resultKey) .type(type) - .commandList(commandList) - .scriptCommandList(commandListComplete) + .singleLineCommand(commandList) + .multiLineCommand(commandListComplete) + .needResultReplay(needResultReplay) + .durationTask(durationTask) .build(); }