[server][ executor]- 优化部分代码

This commit is contained in:
zeaslity
2023-02-28 16:59:22 +08:00
parent 65ed141697
commit fbcff7ee42
3 changed files with 66 additions and 9 deletions

View File

@@ -16,6 +16,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -160,6 +161,30 @@ public class CommandExecutor {
process process
); );
// 获取到命令执行的结果, 即时执行的命令
final int[] processExitCode = new int[0];
final Duration[] commandExecDuration = new Duration[1];
process
.onExit()
.thenAccept(
pro -> {
int exitValue = pro.exitValue();
processExitCode[0] = exitValue;
ProcessHandle.Info info = pro.info();
commandExecDuration[0] = info
.totalCpuDuration()
.get();
log.info(
"任务 [ {} ]命令执行完成,执行时间为 [ {} ], 执行命令的结果为 {}",
info.commandLine(),
info.totalCpuDuration(),
exitValue
);
}
);
boolean commandExecComplete = false; boolean commandExecComplete = false;
try { try {
@@ -182,8 +207,6 @@ public class CommandExecutor {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
System.out.println("process isAlive = " + process.isAlive());
// 任务提前执行结束,或者超过了最长等待时间 // 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成 // 判断命令是否正确处理完成
if (!commandExecComplete) { if (!commandExecComplete) {
@@ -212,9 +235,11 @@ public class CommandExecutor {
//commandExecLogCache.PrintCommandCachedLog(streamKey); //commandExecLogCache.PrintCommandCachedLog(streamKey);
// 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止
commandExecLogCache.StopExecLogBufferedReader( commandExecLogCache.StopExecLogCollect(
streamKey, streamKey,
process process,
processExitCode[0],
commandExecDuration[0]
); );
// 异步执行日志的发送工作 // 异步执行日志的发送工作
@@ -255,7 +280,6 @@ public class CommandExecutor {
} }
@Deprecated @Deprecated
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {

View File

@@ -18,6 +18,7 @@ import javax.annotation.Resource;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@@ -48,7 +49,7 @@ public class CommandExecLogCache {
/** /**
* 固定单进程,用于缓存命令执行日志,关闭命令输入管道 * 固定单进程,用于缓存命令执行日志,关闭命令输入管道
*/ */
private static ExecutorService LogCacheDaemonThread = null; private static ExecutorService LogCacheDaemonThread;
static { static {
@@ -161,6 +162,8 @@ public class CommandExecLogCache {
commandCachedLog::add commandCachedLog::add
); );
// 对于即时执行完成的任务,需要在这里增加尾巴内容
log.debug( log.debug(
"命令代码 [ {} ] 的执行日志内容为 {} ", "命令代码 [ {} ] 的执行日志内容为 {} ",
streamKey, streamKey,
@@ -174,7 +177,7 @@ public class CommandExecLogCache {
* 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭
* 否则部分任务的日志无法收集 * 否则部分任务的日志无法收集
*/ */
public void StopExecLogBufferedReader(String streamKey, Process process) { public void StopExecLogCollect(String streamKey, Process process, int commandExitValue, Duration duration) {
BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey);
@@ -184,6 +187,19 @@ public class CommandExecLogCache {
bufferedReader bufferedReader
); );
// 延迟任务在此时的process还没有关闭需要获取其中的信息
if (ObjectUtils.isNotEmpty(process) && process.isAlive()) {
duration = process
.info()
.totalCpuDuration()
.get();
commandExitValue = process.exitValue();
}
Duration finalDuration = duration;
int finalCommandExitValue = commandExitValue;
LogCacheDaemonThread.submit( LogCacheDaemonThread.submit(
() -> { () -> {
try { try {
@@ -202,6 +218,25 @@ public class CommandExecLogCache {
streamKey streamKey
); );
// 添加任务结束的一些信息
String execTimeCostString = String.format(
"execution time-cost is => [ %s ]",
finalDuration
);
String execResultString = String.format(
"execution result code is => [ %s ]",
finalCommandExitValue
);
ArrayList<String> commandExecCachedLog = CachedCommandLogMap.get(streamKey);
commandExecCachedLog.add("--------------- command result are as above --------------------");
commandExecCachedLog.add(execTimeCostString);
commandExecCachedLog.add(execResultString);
// 从缓存读取其中去掉这个内容
CommandLogBufferedReaderMap.remove(streamKey); CommandLogBufferedReaderMap.remove(streamKey);
} }
}); });

View File

@@ -14,7 +14,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.concurrent.ArrayBlockingQueue;
@Configuration @Configuration
@Slf4j(topic = "Octopus Message Handler") @Slf4j(topic = "Octopus Message Handler")
@@ -60,7 +59,6 @@ public class OMessageHandlerServer {
octopusMessage octopusMessage
); );
// 获取Agent的版本信息 // 获取Agent的版本信息
if (octopusMessage if (octopusMessage
.getUuid() .getUuid()