[agent][executor]- bug - 10

This commit is contained in:
zeaslity
2023-02-26 14:45:27 +08:00
parent e0e2078ae0
commit 2b69b1f73e
2 changed files with 21 additions and 17 deletions

View File

@@ -2,10 +2,8 @@ package io.wdd.agent.executor;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.reply.SimpleStreamSender;
import io.wdd.agent.executor.reply.StreamSender; import io.wdd.agent.executor.reply.StreamSender;
import io.wdd.agent.executor.thread.CommandExecLogCache; import io.wdd.agent.executor.thread.CommandExecLogCache;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@@ -28,15 +26,10 @@ public class CommandExecutor {
@Resource @Resource
StreamSender streamSender; StreamSender streamSender;
@Resource
SimpleStreamSender simpleStreamSender;
@Resource @Resource
CommandExecLogCache commandExecLogCache; CommandExecLogCache commandExecLogCache;
@Resource
OMessageToServerSender toServerSender;
/** /**
* 一个命令执行的最长等待时间 * 一个命令执行的最长等待时间
*/ */

View File

@@ -38,7 +38,11 @@ public class CommandExecLogCache {
/** /**
* 存储每一个任务对应的 日志读取BufferedReader * 存储每一个任务对应的 日志读取BufferedReader
*/ */
private static HashMap<String, BufferedReader> CommandLogReaderMap = new HashMap<>(); private static HashMap<String, BufferedReader> CommandLogBufferedReaderMap = new HashMap<>();
/**
* 存储每一个任务对应的 日志读取 InputStream Reader
*/
private static HashMap<String, InputStreamReader> CommandLogInputReaderMap = new HashMap<>();
@Resource @Resource
SimpleStreamSender simpleStreamSender; SimpleStreamSender simpleStreamSender;
@@ -101,7 +105,6 @@ public class CommandExecLogCache {
); );
// add the command // add the command
commandCachedLog.add(execCommandString); commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString); commandCachedLog.add(execTimeString);
@@ -111,16 +114,20 @@ public class CommandExecLogCache {
log.debug("doCacheLog 开始从process的结果中获取日志缓存"); log.debug("doCacheLog 开始从process的结果中获取日志缓存");
// read from input stream and store to the cacheArrayList // read from input stream and store to the cacheArrayList
BufferedReader bufferedReader = new BufferedReader( InputStreamReader inputStreamReader = new InputStreamReader(
new InputStreamReader( process.getInputStream()
process.getInputStream()
)
); );
BufferedReader bufferedReader = new BufferedReader(
inputStreamReader
);
// 缓存这个 日志读取器 // 缓存这个 日志读取器
CommandLogReaderMap.put( CommandLogBufferedReaderMap.put(
streamKey, streamKey,
bufferedReader bufferedReader
); );
CommandLogInputReaderMap.put(streamKey,
inputStreamReader);
// !! 此处会阻塞 // !! 此处会阻塞
// 阻塞读取命令执行日志的输出流 // 阻塞读取命令执行日志的输出流
@@ -150,12 +157,14 @@ public class CommandExecLogCache {
*/ */
public void StopExecLogBufferedReader(String streamKey, Process process) { public void StopExecLogBufferedReader(String streamKey, Process process) {
BufferedReader bufferedReader = CommandLogReaderMap.get(streamKey); BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey);
InputStreamReader inputStreamReader = CommandLogInputReaderMap.get(streamKey);
log.debug( log.debug(
"开始关停任务 [ {} ]对应的日志读取BufferedReader {}", "开始关停任务 [ {} ]对应的日志读取 BufferedReader {} InputStream Reader {}",
streamKey, streamKey,
bufferedReader bufferedReader,
inputStreamReader
); );
if (ObjectUtils.isNotEmpty(bufferedReader)) { if (ObjectUtils.isNotEmpty(bufferedReader)) {
@@ -166,6 +175,8 @@ public class CommandExecLogCache {
.getInputStream() .getInputStream()
.close(); .close();
inputStreamReader.close();
bufferedReader.close(); bufferedReader.close();
} catch (IOException e) { } catch (IOException e) {