From 2ba0f1f74fdac20ac8fa304ba67f49c586079264 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Sun, 26 Feb 2023 16:17:08 +0800 Subject: [PATCH] [agent][executor]- bug - 15 --- .../executor/thread/CommandExecLogCache.java | 88 ++++++++++++------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 1ecd01d..b694ea7 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -3,6 +3,7 @@ package io.wdd.agent.executor.thread; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.executor.reply.SimpleStreamSender; import io.wdd.agent.message.OMessageToServerSender; @@ -19,6 +20,9 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; /** @@ -44,6 +48,22 @@ public class CommandExecLogCache { */ private static final HashMap CommandLogInputReaderMap = new HashMap<>(); + /** + * 固定单进程,用于缓存命令执行日志,关闭命令输入管道 + */ + private static ExecutorService LogCacheDaemonThread = null; + + static { + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("exec-log-cache") + .setPriority(7) + .build(); + + LogCacheDaemonThread = Executors.newSingleThreadExecutor(threadFactory); + } + @Resource SimpleStreamSender simpleStreamSender; @@ -137,19 +157,24 @@ public class CommandExecLogCache { inputStreamReader ); - log.debug("开始获取bufferedReader中每一行的内容"); - // !! 此处会阻塞 - // 阻塞读取命令执行日志的输出流 - bufferedReader - .lines() - .forEach( - commandCachedLog::add - ); + // 使用统一进程执行缓存和关停动作 + LogCacheDaemonThread.submit( + () -> { + log.debug("开始获取bufferedReader中每一行的内容"); + // !! 此处会阻塞 + // 阻塞读取命令执行日志的输出流 + bufferedReader + .lines() + .forEach( + commandCachedLog::add + ); - log.debug( - "命令代码 [ {} ] 的执行日志内容为 {} ", - streamKey, - commandCachedLog + log.debug( + "命令代码 [ {} ] 的执行日志内容为 {} ", + streamKey, + commandCachedLog + ); + } ); } @@ -169,29 +194,30 @@ public class CommandExecLogCache { inputStreamReader ); - try { - // 关闭command的输出流 - process - .getInputStream() - .close(); + LogCacheDaemonThread.submit( + () -> { + try { + // 关闭command的输出流 + process + .getInputStream() + .close(); - inputStreamReader.close(); + inputStreamReader.close(); - bufferedReader.close(); - - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - log.debug( - "关闭 [{}]的各种输入流读取器成功 !", - streamKey - ); - - CommandLogBufferedReaderMap.remove(streamKey); - CommandLogInputReaderMap.remove(streamKey); - } + bufferedReader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + log.debug( + "关闭 [{}]的各种输入流读取器成功 !", + streamKey + ); + CommandLogBufferedReaderMap.remove(streamKey); + CommandLogInputReaderMap.remove(streamKey); + } + }); } /**