[agent][executor]- bug - 15

This commit is contained in:
zeaslity
2023-02-26 16:17:08 +08:00
parent b88cb01300
commit 2ba0f1f74f

View File

@@ -3,6 +3,7 @@ package io.wdd.agent.executor.thread;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.reply.SimpleStreamSender; import io.wdd.agent.executor.reply.SimpleStreamSender;
import io.wdd.agent.message.OMessageToServerSender; import io.wdd.agent.message.OMessageToServerSender;
@@ -19,6 +20,9 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** /**
@@ -44,6 +48,22 @@ public class CommandExecLogCache {
*/ */
private static final HashMap<String, InputStreamReader> CommandLogInputReaderMap = new HashMap<>(); private static final HashMap<String, InputStreamReader> CommandLogInputReaderMap = new HashMap<>();
/**
* 固定单进程,用于缓存命令执行日志,关闭命令输入管道
*/
private static ExecutorService LogCacheDaemonThread = null;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("exec-log-cache")
.setPriority(7)
.build();
LogCacheDaemonThread = Executors.newSingleThreadExecutor(threadFactory);
}
@Resource @Resource
SimpleStreamSender simpleStreamSender; SimpleStreamSender simpleStreamSender;
@@ -137,19 +157,24 @@ public class CommandExecLogCache {
inputStreamReader inputStreamReader
); );
log.debug("开始获取bufferedReader中每一行的内容"); // 使用统一进程执行缓存和关停动作
// !! 此处会阻塞 LogCacheDaemonThread.submit(
// 阻塞读取命令执行日志的输出流 () -> {
bufferedReader log.debug("开始获取bufferedReader中每一行的内容");
.lines() // !! 此处会阻塞
.forEach( // 阻塞读取命令执行日志的输出流
commandCachedLog::add bufferedReader
); .lines()
.forEach(
commandCachedLog::add
);
log.debug( log.debug(
"命令代码 [ {} ] 的执行日志内容为 {} ", "命令代码 [ {} ] 的执行日志内容为 {} ",
streamKey, streamKey,
commandCachedLog commandCachedLog
);
}
); );
} }
@@ -169,29 +194,30 @@ public class CommandExecLogCache {
inputStreamReader inputStreamReader
); );
try { LogCacheDaemonThread.submit(
// 关闭command的输出流 () -> {
process try {
.getInputStream() // 关闭command的输出流
.close(); process
.getInputStream()
.close();
inputStreamReader.close(); inputStreamReader.close();
bufferedReader.close(); bufferedReader.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
log.debug(
"关闭 [{}]的各种输入流读取器成功 !",
streamKey
);
CommandLogBufferedReaderMap.remove(streamKey);
CommandLogInputReaderMap.remove(streamKey);
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
log.debug(
"关闭 [{}]的各种输入流读取器成功 !",
streamKey
);
CommandLogBufferedReaderMap.remove(streamKey);
CommandLogInputReaderMap.remove(streamKey);
}
});
} }
/** /**