diff --git a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java index d7a5da4..1110b70 100644 --- a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java +++ b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java @@ -131,7 +131,8 @@ public class AgentRebootUpdateService { streamKey, shutdownCommand, false, - false + false, + null ); } } diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java index 3310c36..b5e3702 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java @@ -51,7 +51,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { // add in 2023-1-17 if (CollectionUtils.isNotEmpty(executionMessage.getMultiLineCommand())) { // 传递的是 页面定时任务脚本 - functionExecutor.execute(executionMessage, true); + functionExecutor.execute(octopusMessage, executionMessage, true); return true; } @@ -59,10 +59,10 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { String executionType = executionMessage.getType(); if (ALL_FUNCTION_MAP.containsKey(executionType)) { // execute the exist function - functionExecutor.execute(executionMessage, false); + functionExecutor.execute(octopusMessage, executionMessage, false); } else { // handle command - commandExecutor.execute(executionMessage); + commandExecutor.execute(octopusMessage, executionMessage); } } catch (IOException e) { diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index 3165683..63e05d3 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -2,9 +2,13 @@ package io.wdd.agent.executor; import com.google.common.io.ByteStreams; import io.wdd.agent.config.utils.AgentCommonThreadPool; -import io.wdd.agent.executor.redis.StreamSender; +import io.wdd.agent.executor.reply.SimpleStreamSender; +import io.wdd.agent.executor.reply.StreamSender; import io.wdd.agent.executor.thread.CommandExecLogCache; +import io.wdd.agent.message.OMessageToServerSender; import io.wdd.common.beans.executor.ExecutionMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.utils.TimeUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; @@ -14,6 +18,7 @@ import javax.annotation.Resource; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -25,10 +30,18 @@ public class CommandExecutor { @Resource StreamSender streamSender; + @Resource + SimpleStreamSender simpleStreamSender; @Resource CommandExecLogCache commandExecLogCache; + @Resource + OMessageToServerSender toServerSender; + + /** + * 一个命令执行的最长等待时间 + */ int processMaxWaitSeconds = 60; /** @@ -36,7 +49,7 @@ public class CommandExecutor { * * @param executionMessage get from EXECUTOR_HANDLER */ - public void execute(ExecutionMessage executionMessage) { + public void execute(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { // todo 需要长时间执行的任务 与目前的系统设计存在冲突 防卡死后台进程`出现问题 // 防止阻塞消息队列中的其他信息,需要使用异步执行 @@ -45,13 +58,14 @@ public class CommandExecutor { executionMessage.getResultKey(), executionMessage.getSingleLineCommand(), executionMessage.isNeedResultReplay(), - executionMessage.isDurationTask() + executionMessage.isDurationTask(), + octopusMessage ) ); } - public int execute(String streamKey, List command, boolean needResultReplay, boolean durationTask) { + public int execute(String streamKey, List command, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) { ProcessBuilder processBuilder = new ProcessBuilder(command); @@ -59,28 +73,15 @@ public class CommandExecutor { streamKey, processBuilder, needResultReplay, - durationTask + durationTask, + octopusMessage ); } - public int execute(String streamKey, boolean needResultReplay, boolean durationTask, String... command) { - - ProcessBuilder processBuilder = new ProcessBuilder(command); - - return this.processExecute( - streamKey, - processBuilder, - needResultReplay, - durationTask - ); - - } - - /** * 执行最底层命令行操作的代码 */ - private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask) { + private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) { // 重定向,错误日志到标准输出中 processBuilder.redirectErrorStream(true); @@ -106,10 +107,13 @@ public class CommandExecutor { // 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制 // 2023年2月23日 同时执行log日志部分移动至此部分处理 AgentCommonThreadPool.pool.submit( - StopStuckCommandProcess( + DaemonCommandProcessAndCollectLog( process, processMaxWaitSeconds, - countDownLatch + countDownLatch, + needResultReplay, + streamKey, + octopusMessage )); } @@ -119,15 +123,9 @@ public class CommandExecutor { process ); - // start to send the result log - //streamSender.startToWaitLog(streamKey); - // get the command result must also be a timeout smaller than the process int waitFor = process.waitFor(); - // end send logs - //streamSender.endWaitLog(streamKey); - // get the process result if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) { @@ -156,7 +154,7 @@ public class CommandExecutor { return processResult; } - private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch) { + private Runnable DaemonCommandProcessAndCollectLog(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) { return () -> { boolean commandExecComplete = false; @@ -190,6 +188,9 @@ public class CommandExecutor { ); } + // 日志操作,如果需要显示回传,需要将日志发送回相应的 + collectCommandLogAndRepeat(streamKey, needResultReplay, octopusMessage); + // 只有当该进程还存活,执行关闭操作 if (process.isAlive()) { // shutdown the process @@ -202,6 +203,29 @@ public class CommandExecutor { }; } + private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) { + + // 获取到命令执行的结果, 此操作会清除掉缓存 + ArrayList commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey); + + // 简单的发送到StreamSender + simpleStreamSender.sendLog(streamKey, commandExecLogCachedLog); + + // 需要恢复相应的消息 + if (needResultReplay) { + log.debug("需要准确回复执行命令结果"); + + // 构造相应的数据 + octopusMessage.setAc_time(TimeUtils.currentFormatTime()); + octopusMessage.setResult(commandExecLogCachedLog); + + // 发送返回执行完成的 OM结果 + toServerSender.send(octopusMessage); + + } + + } + @Deprecated private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { @@ -224,6 +248,7 @@ public class CommandExecutor { @SneakyThrows + @Deprecated public void clearCommandCache(String streamKey) { // wait diff --git a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java index 38d91c4..df35aeb 100644 --- a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java @@ -5,6 +5,7 @@ import com.alibaba.nacos.api.exception.NacosException; import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.executor.config.ExecutorFunctionNacosCollector; import io.wdd.common.beans.executor.ExecutionMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; @@ -29,7 +30,7 @@ public class FunctionExecutor { ExecutorFunctionNacosCollector executorFunctionNacosCollector; - public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) { + public void execute(OctopusMessage octopusMessage, ExecutionMessage executionMessage, boolean isScheduledScript) { // 拿到 resultKey String resultKey = executionMessage.getResultKey(); @@ -49,14 +50,15 @@ public class FunctionExecutor { resultKey, multiLineCommand, executionMessage.isNeedResultReplay(), - executionMessage.isDurationTask() + executionMessage.isDurationTask(), + octopusMessage ) ); } - private void doExecuteFunction(String streamKey, List> multiLineCommand, boolean needResultReplay, boolean durationTask) { + private void doExecuteFunction(String streamKey, List> multiLineCommand, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) { log.info( "[ Function Executor ] all commands are ==> {}", @@ -71,7 +73,8 @@ public class FunctionExecutor { streamKey, iterator.next(), needResultReplay, - durationTask + durationTask, + octopusMessage ); if (execute != 0) { @@ -80,11 +83,12 @@ public class FunctionExecutor { } } - // 清除命令行的执行日志缓存 - commandExecutor.clearCommandCache(streamKey); } + /** + * 后台监听Nacos的配置,实时跟新从Nacos中修改过的 函数命令脚本的内容 + */ @Bean private void daemonListenToNacosFunctions() { @@ -107,8 +111,8 @@ public class FunctionExecutor { s ); + // 解析获取到的功能脚本的字符串,然后解析为缓存 executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s); - } } ); diff --git a/agent/src/main/java/io/wdd/agent/executor/reply/SimpleStreamSender.java b/agent/src/main/java/io/wdd/agent/executor/reply/SimpleStreamSender.java new file mode 100644 index 0000000..3f7c1a1 --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/executor/reply/SimpleStreamSender.java @@ -0,0 +1,71 @@ +package io.wdd.agent.executor.reply; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.utils.TimeUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.connection.stream.StringRecord; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * 简单的发送缓存的命令执行Log到Redis的StreamKey中 + */ +@Service +@Slf4j +public class SimpleStreamSender { + + @Resource + RedisTemplate redisTemplate; + + @Resource + ObjectMapper objectMapper; + + /** + * 根据StreamKey 发送命令日志到Redis中 + * + * @param streamKey 返回结果的Key + * @return + */ + public boolean sendLog(String streamKey, ArrayList commandExecLogCachedLog) { + + return this.send(streamKey, commandExecLogCachedLog); + + } + + private boolean send(String streamKey, List content) { + + try { + + String resultContent = objectMapper.writeValueAsString(content); + + // 构造Stream类型的数据结构,然后进行传输 + HashMap map = new HashMap<>(); + map.put(TimeUtils.currentTimeString(), resultContent); + + log.debug("redis stream sender message is {}", map); + + StringRecord stringRecord = StreamRecords + .string(map).withStreamKey(streamKey); + + // 发送内容到Redis中 + RecordId recordId = redisTemplate.opsForStream().add(stringRecord); + + // 返回结果 + return ObjectUtils.isNotEmpty(recordId); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/reply/StreamSender.java similarity index 99% rename from agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java rename to agent/src/main/java/io/wdd/agent/executor/reply/StreamSender.java index aca246b..7782c6c 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/reply/StreamSender.java @@ -1,4 +1,4 @@ -package io.wdd.agent.executor.redis; +package io.wdd.agent.executor.reply; import com.fasterxml.jackson.core.JsonProcessingException; @@ -48,6 +48,7 @@ public class StreamSender { @SneakyThrows + @Deprecated private static Map generateFakeData() { String random = RandomStringUtils.random(16); CommandLog commandLog = new CommandLog(); diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 5fd844c..12f5b29 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -1,6 +1,7 @@ package io.wdd.agent.executor.thread; +import io.wdd.common.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; @@ -46,9 +47,15 @@ public class CommandExecLogCache { .get() ); + String execTimeString = String.format( + "execution time is => [ %s ]", + TimeUtils.currentTimeString() + ); + // add the command commandCachedLog.add(""); commandCachedLog.add(execCommandString); + commandCachedLog.add(execTimeString); commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add(""); @@ -76,7 +83,6 @@ public class CommandExecLogCache { // 清除Key CachedCommandLog.remove(streamKey); - return execLogCacheArrayList; } diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java b/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java deleted file mode 100644 index 07fb10a..0000000 --- a/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.wdd.agent.executor.thread; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.wdd.common.beans.response.R; - -import java.util.concurrent.*; - -//public class DaemonLogThread { -// -// private static final ExecutorService executorService; -// -// static { -// -// ThreadFactory daemonLogThread = new ThreadFactoryBuilder() -// .setDaemon(true) -// .setNameFormat("BackendToRedisThread") -// .setPriority(1) -// .build(); -// -// executorService = Executors.newSingleThreadExecutor(daemonLogThread); -// -// } -// -// public static Future start(Runnable backendToRedisStream) { -// -// return executorService.submit(backendToRedisStream); -// } -// -// public static void stop(Runnable backendToRedisStream) { -// executorService.shutdownNow(); -// } -//} diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index 85793ac..8ee3848 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -50,7 +50,7 @@ public class TestCommandExecutorController { System.out.println("executionMessage = " + executionMessage); - functionExecutor.execute(executionMessage, false); + functionExecutor.execute(null, executionMessage, false); return R.ok(streamKey); } diff --git a/agent/src/main/java/io/wdd/agent/message/OMessageToServerSender.java b/agent/src/main/java/io/wdd/agent/message/OMessageToServerSender.java index 439b2c0..8c5db8d 100644 --- a/agent/src/main/java/io/wdd/agent/message/OMessageToServerSender.java +++ b/agent/src/main/java/io/wdd/agent/message/OMessageToServerSender.java @@ -32,14 +32,16 @@ public class OMessageToServerSender { RabbitTemplate rabbitTemplate; - + /** + * 发送消息至MQ中, 队列为 OCTOPUS_TO_SERVER, 信息需要有Server自行处理 + * + * @param octopusMessage 需要发送的OM信息 + * @return + */ public boolean send(OctopusMessage octopusMessage) { - octopusMessage.setAc_time(LocalDateTime.now()); - // send to Queue -- InitToServer - - log.info("send Message to Server = {}", octopusMessage); + log.debug("send Message to Server = {}", octopusMessage); try { @@ -52,7 +54,7 @@ public class OMessageToServerSender { } catch (JsonProcessingException e) { - log.error("Failed to send message to Serv er ! = {}", octopusMessage); + log.error("Failed to send message to Server ! = {}", octopusMessage); throw new MyRuntimeException(e); } diff --git a/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java b/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java deleted file mode 100644 index 7182525..0000000 --- a/agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.wdd.agent; - -import io.wdd.agent.executor.CommandExecutor; - -import javax.annotation.Resource; - -//@SpringBootTest -public class InitRabbitMQTest { - - @Resource - CommandExecutor commandExecutor; - - -// @Test - void testInitSendInfo() { - - String homeDirectory = System.getProperty("user.home"); - - String format = String.format("C:\\program files\\powershell\\7\\pwsh.exe /c dir %s | findstr \"Desktop\"", homeDirectory); - - commandExecutor.execute("sasda", - false, - false, - "C:\\program files\\powershell\\7\\pwsh.exe", - "pwd"); - - - } -}