diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index df78de5..55a2aa9 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -2,10 +2,8 @@ package io.wdd.agent.executor; import com.google.common.io.ByteStreams; import io.wdd.agent.config.utils.AgentCommonThreadPool; -import io.wdd.agent.executor.reply.SimpleStreamSender; import io.wdd.agent.executor.reply.StreamSender; import io.wdd.agent.executor.thread.CommandExecLogCache; -import io.wdd.agent.message.OMessageToServerSender; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import lombok.SneakyThrows; @@ -28,15 +26,10 @@ public class CommandExecutor { @Resource StreamSender streamSender; - @Resource - SimpleStreamSender simpleStreamSender; @Resource CommandExecLogCache commandExecLogCache; - @Resource - OMessageToServerSender toServerSender; - /** * 一个命令执行的最长等待时间 */ diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index e55cd15..1635a67 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -38,7 +38,11 @@ public class CommandExecLogCache { /** * 存储每一个任务对应的 日志读取BufferedReader */ - private static HashMap CommandLogReaderMap = new HashMap<>(); + private static HashMap CommandLogBufferedReaderMap = new HashMap<>(); + /** + * 存储每一个任务对应的 日志读取 InputStream Reader + */ + private static HashMap CommandLogInputReaderMap = new HashMap<>(); @Resource SimpleStreamSender simpleStreamSender; @@ -101,7 +105,6 @@ public class CommandExecLogCache { ); - // add the command commandCachedLog.add(execCommandString); commandCachedLog.add(execTimeString); @@ -111,16 +114,20 @@ public class CommandExecLogCache { log.debug("doCacheLog 开始从process的结果中获取日志缓存"); // read from input stream and store to the cacheArrayList - BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader( - process.getInputStream() - ) + InputStreamReader inputStreamReader = new InputStreamReader( + process.getInputStream() ); + BufferedReader bufferedReader = new BufferedReader( + inputStreamReader + ); + // 缓存这个 日志读取器 - CommandLogReaderMap.put( + CommandLogBufferedReaderMap.put( streamKey, bufferedReader ); + CommandLogInputReaderMap.put(streamKey, + inputStreamReader); // !! 此处会阻塞 // 阻塞读取命令执行日志的输出流 @@ -150,12 +157,14 @@ public class CommandExecLogCache { */ public void StopExecLogBufferedReader(String streamKey, Process process) { - BufferedReader bufferedReader = CommandLogReaderMap.get(streamKey); + BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); + InputStreamReader inputStreamReader = CommandLogInputReaderMap.get(streamKey); log.debug( - "开始关停任务 [ {} ]对应的日志读取BufferedReader {}", + "开始关停任务 [ {} ]对应的日志读取 BufferedReader {} InputStream Reader {}", streamKey, - bufferedReader + bufferedReader, + inputStreamReader ); if (ObjectUtils.isNotEmpty(bufferedReader)) { @@ -166,6 +175,8 @@ public class CommandExecLogCache { .getInputStream() .close(); + inputStreamReader.close(); + bufferedReader.close(); } catch (IOException e) {