From fbcff7ee42b4704b92d981df290675159862d37e Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 16:59:22 +0800 Subject: [PATCH] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wdd/agent/executor/CommandExecutor.java | 34 +++++++++++++--- .../executor/thread/CommandExecLogCache.java | 39 ++++++++++++++++++- .../handler/OMessageHandlerServer.java | 2 - 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index b805cb4..a0f530a 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -16,6 +16,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -160,6 +161,30 @@ public class CommandExecutor { process ); + // 获取到命令执行的结果, 即时执行的命令 + final int[] processExitCode = new int[0]; + final Duration[] commandExecDuration = new Duration[1]; + process + .onExit() + .thenAccept( + pro -> { + int exitValue = pro.exitValue(); + + processExitCode[0] = exitValue; + ProcessHandle.Info info = pro.info(); + commandExecDuration[0] = info + .totalCpuDuration() + .get(); + + log.info( + "任务 [ {} ]命令执行完成,执行时间为 [ {} ], 执行命令的结果为 {}", + info.commandLine(), + info.totalCpuDuration(), + exitValue + ); + } + ); + boolean commandExecComplete = false; try { @@ -182,8 +207,6 @@ public class CommandExecutor { throw new RuntimeException(e); } finally { - System.out.println("process isAlive = " + process.isAlive()); - // 任务提前执行结束,或者超过了最长等待时间 // 判断命令是否正确处理完成 if (!commandExecComplete) { @@ -212,9 +235,11 @@ public class CommandExecutor { //commandExecLogCache.PrintCommandCachedLog(streamKey); // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 - commandExecLogCache.StopExecLogBufferedReader( + commandExecLogCache.StopExecLogCollect( streamKey, - process + process, + processExitCode[0], + commandExecDuration[0] ); // 异步执行日志的发送工作 @@ -255,7 +280,6 @@ public class CommandExecutor { } - @Deprecated private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 2499504..fd1aafe 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -18,6 +18,7 @@ import javax.annotation.Resource; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ExecutorService; @@ -48,7 +49,7 @@ public class CommandExecLogCache { /** * 固定单进程,用于缓存命令执行日志,关闭命令输入管道 */ - private static ExecutorService LogCacheDaemonThread = null; + private static ExecutorService LogCacheDaemonThread; static { @@ -161,6 +162,8 @@ public class CommandExecLogCache { commandCachedLog::add ); + // 对于即时执行完成的任务,需要在这里增加尾巴内容 + log.debug( "命令代码 [ {} ] 的执行日志内容为 {} ", streamKey, @@ -174,7 +177,7 @@ public class CommandExecLogCache { * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 否则部分任务的日志无法收集 */ - public void StopExecLogBufferedReader(String streamKey, Process process) { + public void StopExecLogCollect(String streamKey, Process process, int commandExitValue, Duration duration) { BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); @@ -184,6 +187,19 @@ public class CommandExecLogCache { bufferedReader ); + // 延迟任务,在此时的process还没有关闭,需要获取其中的信息 + if (ObjectUtils.isNotEmpty(process) && process.isAlive()) { + duration = process + .info() + .totalCpuDuration() + .get(); + + commandExitValue = process.exitValue(); + + } + + Duration finalDuration = duration; + int finalCommandExitValue = commandExitValue; LogCacheDaemonThread.submit( () -> { try { @@ -202,6 +218,25 @@ public class CommandExecLogCache { streamKey ); + // 添加任务结束的一些信息 + String execTimeCostString = String.format( + "execution time-cost is => [ %s ]", + finalDuration + ); + + String execResultString = String.format( + "execution result code is => [ %s ]", + finalCommandExitValue + ); + + ArrayList commandExecCachedLog = CachedCommandLogMap.get(streamKey); + + commandExecCachedLog.add("--------------- command result are as above --------------------"); + commandExecCachedLog.add(execTimeCostString); + commandExecCachedLog.add(execResultString); + + + // 从缓存读取其中去掉这个内容 CommandLogBufferedReaderMap.remove(streamKey); } }); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java index 82ca28a..570ff21 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java @@ -14,7 +14,6 @@ import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayDeque; -import java.util.concurrent.ArrayBlockingQueue; @Configuration @Slf4j(topic = "Octopus Message Handler") @@ -60,7 +59,6 @@ public class OMessageHandlerServer { octopusMessage ); - // 获取Agent的版本信息 if (octopusMessage .getUuid()