diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index fdd49db..2b3f65e 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -8,8 +8,6 @@ import io.wdd.agent.executor.thread.CommandExecLogCache; import io.wdd.agent.message.OMessageToServerSender; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; -import io.wdd.common.handler.MyRuntimeException; -import io.wdd.common.utils.TimeUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; @@ -19,12 +17,9 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; @Configuration @@ -135,6 +130,7 @@ public class CommandExecutor { )); // 缓存 命令处理日志 + // 如果是特别简单的命令,必须要放在此处才可以,否则会导致 无法收集 commandExecLogCache.cacheLog( streamKey, process @@ -193,12 +189,11 @@ public class CommandExecutor { System.out.println("process isAlive = " + process.isAlive()); - // 任务提前执行结束,或者超过了最长等待时间 // 判断命令是否正确处理完成 if (!commandExecComplete) { log.warn( - "Command [ {} ] stuck for {} s, destroy the command process!", + "任务 [ {} ]执行超过了最长等待时间 {} 秒, destroy the command process!", process .info() .commandLine() @@ -217,65 +212,38 @@ public class CommandExecutor { .commandLine() .get() ); - - //process.destroy(); - try { - - /*byte[] bytes = process - .getInputStream() - .readAllBytes(); - - String s = new String( - bytes, - StandardCharsets.UTF_8 - ); - - log.debug( - "从process中获取到的 所有字符内容为 {}", - s - );*/ - - // 关闭这个命令执行的inputStream - log.debug( - "关闭Process [ {} ]命令执行的inputStream", - process.info() - ); - process - .getInputStream() - .close(); - - } catch (IOException e) { - throw new RuntimeException(e); - } - - try { - Process exitProcess = process - .onExit() - .get( - commandExecWaitTimeout, - TimeUnit.SECONDS - ); - - - commandExecLogCache.debugProcessStreams(exitProcess); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new MyRuntimeException(e); - } } + + // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 + commandExecLogCache.StopExecLogBufferedReader(streamKey); + } - // 日志操作,如果需要显示回传,需要将日志发送回相应的 - /*collectCommandLogAndRepeat( - streamKey, - needResultReplay, - octopusMessage - );*/ + // 异步执行日志的发送工作 + //commandExecLogCache.CollectAndSendExecLog(streamKey, needResultReplay, octopusMessage); // 执行到这里,说明整个任务流程结束(超时结束) log.debug( - "命令 [ {} ] 执行全流程结束!", - process + "命令 [ {} ] 执行全流程结束! 开始释放所有资源", + process.info() ); + + // 释放所有的资源 + + try { + process + .getInputStream() + .close(); + process + .getOutputStream() + .close(); + process + .getErrorStream() + .close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (process.isAlive()) { // shutdown the process process.destroyForcibly(); @@ -284,45 +252,7 @@ public class CommandExecutor { } - /** - * 日志回传操作 - * 默认发送值Redis中 - * 如果需要持久化,则需要发送至RabbitMQ中 - * - * @param streamKey - * @param needResultReplay - * @param octopusMessage - */ - private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) { - // 获取到命令执行的结果, 此操作会清除掉缓存 - ArrayList commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey); - - log.debug( - "从缓存中获取到的命令执行日志为: {}", - commandExecLogCachedLog - ); - - // 简单的发送到StreamSender - simpleStreamSender.sendLog( - streamKey, - commandExecLogCachedLog - ); - - // 需要恢复相应的消息 - if (needResultReplay) { - log.debug("需要准确回复执行命令结果"); - - // 构造相应的数据 - octopusMessage.setAc_time(TimeUtils.currentFormatTime()); - octopusMessage.setResult(commandExecLogCachedLog); - - // 发送返回执行完成的 OM结果 - toServerSender.send(octopusMessage); - - } - - } @Deprecated private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index b2110cc..ae2c7d6 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -1,13 +1,19 @@ package io.wdd.agent.executor.thread; +import io.wdd.agent.config.utils.AgentCommonThreadPool; +import io.wdd.agent.executor.reply.SimpleStreamSender; +import io.wdd.agent.message.OMessageToServerSender; +import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.handler.MyRuntimeException; import io.wdd.common.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; @@ -26,7 +32,17 @@ public class CommandExecLogCache { */ private static final ArrayList EmptyCommandLog = new ArrayList<>(); // 存储命令执行缓存日志 - public static HashMap> CachedCommandLogMap = new HashMap<>(); + private static HashMap> CachedCommandLogMap = new HashMap<>(); + /** + * 存储每一个任务对应的 日志读取BufferedReader + */ + private static HashMap CommandLogReaderMap = new HashMap<>(); + + @Resource + SimpleStreamSender simpleStreamSender; + + @Resource + OMessageToServerSender toServerSender; /** * 缓存命令执行日志 @@ -57,13 +73,152 @@ public class CommandExecLogCache { } + /** + * 实际执行命令缓存操作 + * + * @param streamKey + */ + private void doCacheLog(String streamKey, Process process) { + + ArrayList commandCachedLog = new ArrayList<>(128); + + String execCommandString = String.format( + "execution command are => [ %s ]", + process + .info() + .commandLine() + .get() + ); + + String execTimeString = String.format( + "execution time is => [ %s ]", + TimeUtils.currentTimeString() + ); + + String execResultString = String.format( + "execution result is => [ %s ]", + process.exitValue() + ); + + // add the command + commandCachedLog.add(execCommandString); + commandCachedLog.add(execTimeString); + commandCachedLog.add(execResultString); + commandCachedLog.add("--------------- command result are as below --------------------"); + commandCachedLog.add(""); + + log.debug("doCacheLog 开始从process的结果中获取日志缓存"); + + // read from input stream and store to the cacheArrayList + BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader( + process.getInputStream() + ) + ); + // 缓存这个 日志读取器 + CommandLogReaderMap.put( + streamKey, + bufferedReader + ); + + + // !! 此处会阻塞 + // 阻塞读取命令执行日志的输出流 + bufferedReader + .lines() + .forEach( + commandCachedLog::add + ); + + log.debug( + "命令代码 [ {} ] 的执行日志内容为 {} ", + streamKey, + commandCachedLog + ); + } + + /** + * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 + * 否则部分任务的日志无法收集 + */ + public void StopExecLogBufferedReader(String streamKey) { + + BufferedReader bufferedReader = CommandLogReaderMap.get(streamKey); + if (ObjectUtils.isNotEmpty(bufferedReader)) { + try { + + log.debug("开始关停任务 [ {} ]对应的日志读取后端", + streamKey); + + bufferedReader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * 日志回传操作 + * 默认发送值Redis中 + * 如果需要持久化,则需要发送至RabbitMQ中 + * + * @param streamKey + * @param needResultReplay + * @param octopusMessage + */ + public void CollectAndSendExecLog(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) { + + // 日志操作,如果需要显示回传,需要将日志发送回相应的MQ中 + // 使用异步的方式 + AgentCommonThreadPool.pool.submit( + () -> + collectCommandLogAndRepeat( + streamKey, + needResultReplay, + octopusMessage + ) + ); + } + + private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) { + + // 获取到命令执行的结果, 此操作会清除掉缓存 + ArrayList commandExecLogCachedLog = GetAndCleanExecCacheLog(streamKey); + + log.debug( + "从缓存中获取到的命令执行日志为: {}", + commandExecLogCachedLog + ); + + // 简单的发送到StreamSender + simpleStreamSender.sendLog( + streamKey, + commandExecLogCachedLog + ); + + // 需要 回复 相应的消息 + if (needResultReplay) { + log.debug("需要准确回复执行命令结果"); + + // 构造相应的数据 + octopusMessage.setAc_time(TimeUtils.currentFormatTime()); + octopusMessage.setResult(commandExecLogCachedLog); + + // 发送返回执行完成的 OM结果 + toServerSender.send(octopusMessage); + + } + + } + /** * 获取缓存的 命令执行缓存日志 * * @param streamKey * @return */ - public ArrayList getCacheLog(String streamKey) { + private ArrayList GetAndCleanExecCacheLog(String streamKey) { // 获取 ArrayList execLogCacheArrayList = CachedCommandLogMap.getOrDefault( @@ -72,7 +227,7 @@ public class CommandExecLogCache { ); // 清除Key - //CachedCommandLogMap.remove(streamKey); + CachedCommandLogMap.remove(streamKey); return execLogCacheArrayList; } @@ -113,51 +268,6 @@ public class CommandExecLogCache { } - /** - * 实际执行命令缓存操作 - * - * @param streamKey - */ - private void doCacheLog(String streamKey, Process process) { - - ArrayList commandCachedLog = new ArrayList<>(128); - - String execCommandString = String.format( - "execution command are => [ %s ]", - process - .info() - .commandLine() - .get() - ); - - String execTimeString = String.format( - "execution time is => [ %s ]", - TimeUtils.currentTimeString() - ); - - // add the command - commandCachedLog.add(""); - commandCachedLog.add(execCommandString); - commandCachedLog.add(execTimeString); - commandCachedLog.add("--------------- command result are as below --------------------"); - commandCachedLog.add(""); - - log.debug("doCacheLog 开始从process的结果中获取日志缓存"); - - // read from input stream and store to the cacheArrayList - new BufferedReader(new InputStreamReader(process.getInputStream())) - .lines() - .forEach( - commandCachedLog::add - ); - - log.debug( - "current streamKey is [ {} ] and CacheLog is [ {} ]", - streamKey, - commandCachedLog - ); - } - /** * 获取到命令执行日志 缓存存储实际的ArrayList *