From d3b36270c54de28b6f60f190514c978ababa7e7e Mon Sep 17 00:00:00 2001 From: zeaslity Date: Sat, 25 Feb 2023 21:56:30 +0800 Subject: [PATCH] [agent][executor]- bug - 3 --- .../wdd/agent/executor/CommandExecutor.java | 159 +++++++++--------- .../executor/thread/CommandExecLogCache.java | 5 +- 2 files changed, 80 insertions(+), 84 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index b6395ef..456e327 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -11,7 +11,6 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.utils.TimeUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ObjectUtils; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; @@ -108,7 +107,7 @@ public class CommandExecutor { Process process = processBuilder.start(); // 守护进程 对每一条任务的超时时间进行限制,避免卡死 - int commandExecWaitTimeout = processMaxWaitSeconds; + int commandExecWaitTimeout; if (durationTask) { // 持久化任务的超时等待时间会非常长,但还是采用相同的机制 log.info( @@ -116,11 +115,13 @@ public class CommandExecutor { durationTaskMaxWaitSeconds ); commandExecWaitTimeout = durationTaskMaxWaitSeconds; + } else { + commandExecWaitTimeout = processMaxWaitSeconds; } // 2023年2月23日 同时执行log日志部分移动至此部分处理 AgentCommonThreadPool.pool.submit( - DaemonCommandProcessAndCollectLog( + () -> DaemonCommandProcessAndCollectLog( process, commandExecWaitTimeout, countDownLatch, @@ -129,19 +130,15 @@ public class CommandExecutor { octopusMessage )); + // 等待1秒钟, 使得有时间进行后端任务的完成 + TimeUnit.SECONDS.sleep(1); + // get the command result must also be a timeout smaller than the process - int waitFor = process.waitFor(); + // 此处会把主线程卡死, forever终结 + processResult = process.waitFor(); - - // get the process result - if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process.info())) { - - // 命令执行完成, countDownLatch计数 - countDownLatch.countDown(); - } - - // 设置 命令执行退出返回值 - processResult = process.exitValue(); + // 命令执行完成, countDownLatch计数 + countDownLatch.countDown(); log.debug( "current shell command {} result is {}", @@ -160,14 +157,36 @@ public class CommandExecutor { return processResult; } - private Runnable DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) { - return () -> { + private void DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) { - boolean commandExecComplete = false; + boolean commandExecComplete = false; - try { - log.debug( - "[ {} ] 命令执行守护进程开始等待 {} 秒", + try { + log.debug( + "[ {} ] 命令执行守护进程开始等待 {} 秒", + process + .info() + .commandLine() + .get(), + commandExecWaitTimeout + ); + + // 使用 countDownLatch 进行超时等待 + commandExecComplete = countDownLatch.await( + commandExecWaitTimeout, + TimeUnit.SECONDS + ); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + + // 任务提前执行结束,或者超过了最长等待时间 + // 判断命令是否正确处理完成 + if (!commandExecComplete) { + log.warn( + "Command [ {} ] stuck for {} s, destroy the command process!", process .info() .commandLine() @@ -175,77 +194,53 @@ public class CommandExecutor { commandExecWaitTimeout ); - // 使用 countDownLatch 进行超时等待 - commandExecComplete = countDownLatch.await( - commandExecWaitTimeout, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - - - - // 任务提前执行结束,或者超过了最长等待时间 - // 判断命令是否正确处理完成 - if (!commandExecComplete) { - log.warn( - "Command [ {} ] stuck for {} s, destroy the command process!", + // 命令没有正确的处理完成,需要手动关停程序 + // 只有当该进程还存活,执行关闭操作, 没有返回结果的命令如 ping baidu.com 只有在线程关闭之后才可以有结果s + if (process.isAlive()) { + // shutdown the process + log.debug( + "开始销毁命令 [ {} ] 执行进程!", process .info() .commandLine() - .get(), - commandExecWaitTimeout + .get() ); + // 对线程进行debug + commandExecLogCache.debugProcessStreams(process); - // 命令没有正确的处理完成,需要手动关停程序 - // 只有当该进程还存活,执行关闭操作, 没有返回结果的命令如 ping baidu.com 只有在线程关闭之后才可以有结果s - if (process.isAlive()) { - // shutdown the process - log.debug( - "开始销毁命令 [ {} ] 执行进程!", - process - .info() - .commandLine() - .get() - ); - - // 对线程进行debug - commandExecLogCache.debugProcessStreams(process); - - process.destroy(); - } - } - - // 缓存 命令处理日志 - commandExecLogCache.cacheLog( - streamKey, - process - ); - - // 日志操作,如果需要显示回传,需要将日志发送回相应的 - collectCommandLogAndRepeat( - streamKey, - needResultReplay, - octopusMessage - ); - - // 执行到这里,说明整个任务流程结束 - log.debug( - "命令 [ {} ]执行全流程结束!", - process - .info() - .commandLine() - .get() - ); - if (process.isAlive()) { - // shutdown the process - process.destroyForcibly(); + process.destroy(); } } - }; + + // 缓存 命令处理日志 + commandExecLogCache.cacheLog( + streamKey, + process + ); + + // 日志操作,如果需要显示回传,需要将日志发送回相应的 + collectCommandLogAndRepeat( + streamKey, + needResultReplay, + octopusMessage + ); + + // 执行到这里,说明整个任务流程结束 + log.debug( + "命令 [ {} ]执行全流程结束!", + process + .info() + .commandLine() + .get() + ); + if (process.isAlive()) { + // shutdown the process + process.destroyForcibly(); + } + } + } /** diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 9b0e30f..5458172 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -1,6 +1,7 @@ package io.wdd.agent.executor.thread; +import io.wdd.common.handler.MyRuntimeException; import io.wdd.common.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; @@ -42,9 +43,9 @@ public class CommandExecLogCache { process.info() ); - if (ObjectUtils.isEmpty(process.exitValue())) { + if (ObjectUtils.isEmpty(process.info())) { log.error("process is null ! cache log error !"); - return; + throw new MyRuntimeException(); } ArrayList commandCachedLog = new ArrayList<>(128);