[agent][executor]- bug - 3

This commit is contained in:
zeaslity
2023-02-25 21:56:30 +08:00
parent 3c245b126e
commit d3b36270c5
2 changed files with 80 additions and 84 deletions

View File

@@ -11,7 +11,6 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -108,7 +107,7 @@ public class CommandExecutor {
Process process = processBuilder.start(); Process process = processBuilder.start();
// 守护进程 对每一条任务的超时时间进行限制,避免卡死 // 守护进程 对每一条任务的超时时间进行限制,避免卡死
int commandExecWaitTimeout = processMaxWaitSeconds; int commandExecWaitTimeout;
if (durationTask) { if (durationTask) {
// 持久化任务的超时等待时间会非常长,但还是采用相同的机制 // 持久化任务的超时等待时间会非常长,但还是采用相同的机制
log.info( log.info(
@@ -116,11 +115,13 @@ public class CommandExecutor {
durationTaskMaxWaitSeconds durationTaskMaxWaitSeconds
); );
commandExecWaitTimeout = durationTaskMaxWaitSeconds; commandExecWaitTimeout = durationTaskMaxWaitSeconds;
} else {
commandExecWaitTimeout = processMaxWaitSeconds;
} }
// 2023年2月23日 同时执行log日志部分移动至此部分处理 // 2023年2月23日 同时执行log日志部分移动至此部分处理
AgentCommonThreadPool.pool.submit( AgentCommonThreadPool.pool.submit(
DaemonCommandProcessAndCollectLog( () -> DaemonCommandProcessAndCollectLog(
process, process,
commandExecWaitTimeout, commandExecWaitTimeout,
countDownLatch, countDownLatch,
@@ -129,19 +130,15 @@ public class CommandExecutor {
octopusMessage octopusMessage
)); ));
// 等待1秒钟, 使得有时间进行后端任务的完成
TimeUnit.SECONDS.sleep(1);
// get the command result must also be a timeout smaller than the process // get the command result must also be a timeout smaller than the process
int waitFor = process.waitFor(); // 此处会把主线程卡死, forever终结
processResult = process.waitFor();
// 命令执行完成, countDownLatch计数
// get the process result countDownLatch.countDown();
if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process.info())) {
// 命令执行完成, countDownLatch计数
countDownLatch.countDown();
}
// 设置 命令执行退出返回值
processResult = process.exitValue();
log.debug( log.debug(
"current shell command {} result is {}", "current shell command {} result is {}",
@@ -160,14 +157,36 @@ public class CommandExecutor {
return processResult; return processResult;
} }
private Runnable DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) { private void DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) {
return () -> {
boolean commandExecComplete = false; boolean commandExecComplete = false;
try { try {
log.debug( log.debug(
"[ {} ] 命令执行守护进程开始等待 {} 秒", "[ {} ] 命令执行守护进程开始等待 {} 秒",
process
.info()
.commandLine()
.get(),
commandExecWaitTimeout
);
// 使用 countDownLatch 进行超时等待
commandExecComplete = countDownLatch.await(
commandExecWaitTimeout,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成
if (!commandExecComplete) {
log.warn(
"Command [ {} ] stuck for {} s, destroy the command process!",
process process
.info() .info()
.commandLine() .commandLine()
@@ -175,77 +194,53 @@ public class CommandExecutor {
commandExecWaitTimeout commandExecWaitTimeout
); );
// 使用 countDownLatch 进行超时等待
commandExecComplete = countDownLatch.await(
commandExecWaitTimeout,
TimeUnit.SECONDS
);
} catch (InterruptedException e) { // 命令没有正确的处理完成,需要手动关停程序
throw new RuntimeException(e); // 只有当该进程还存活,执行关闭操作, 没有返回结果的命令如 ping baidu.com 只有在线程关闭之后才可以有结果s
} finally { if (process.isAlive()) {
// shutdown the process
log.debug(
"开始销毁命令 [ {} ] 执行进程!",
// 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成
if (!commandExecComplete) {
log.warn(
"Command [ {} ] stuck for {} s, destroy the command process!",
process process
.info() .info()
.commandLine() .commandLine()
.get(), .get()
commandExecWaitTimeout
); );
// 对线程进行debug
commandExecLogCache.debugProcessStreams(process);
// 命令没有正确的处理完成,需要手动关停程序 process.destroy();
// 只有当该进程还存活,执行关闭操作, 没有返回结果的命令如 ping baidu.com 只有在线程关闭之后才可以有结果s
if (process.isAlive()) {
// shutdown the process
log.debug(
"开始销毁命令 [ {} ] 执行进程!",
process
.info()
.commandLine()
.get()
);
// 对线程进行debug
commandExecLogCache.debugProcessStreams(process);
process.destroy();
}
}
// 缓存 命令处理日志
commandExecLogCache.cacheLog(
streamKey,
process
);
// 日志操作,如果需要显示回传,需要将日志发送回相应的
collectCommandLogAndRepeat(
streamKey,
needResultReplay,
octopusMessage
);
// 执行到这里,说明整个任务流程结束
log.debug(
"命令 [ {} ]执行全流程结束!",
process
.info()
.commandLine()
.get()
);
if (process.isAlive()) {
// shutdown the process
process.destroyForcibly();
} }
} }
};
// 缓存 命令处理日志
commandExecLogCache.cacheLog(
streamKey,
process
);
// 日志操作,如果需要显示回传,需要将日志发送回相应的
collectCommandLogAndRepeat(
streamKey,
needResultReplay,
octopusMessage
);
// 执行到这里,说明整个任务流程结束
log.debug(
"命令 [ {} ]执行全流程结束!",
process
.info()
.commandLine()
.get()
);
if (process.isAlive()) {
// shutdown the process
process.destroyForcibly();
}
}
} }
/** /**

View File

@@ -1,6 +1,7 @@
package io.wdd.agent.executor.thread; package io.wdd.agent.executor.thread;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
@@ -42,9 +43,9 @@ public class CommandExecLogCache {
process.info() process.info()
); );
if (ObjectUtils.isEmpty(process.exitValue())) { if (ObjectUtils.isEmpty(process.info())) {
log.error("process is null ! cache log error !"); log.error("process is null ! cache log error !");
return; throw new MyRuntimeException();
} }
ArrayList<String> commandCachedLog = new ArrayList<>(128); ArrayList<String> commandCachedLog = new ArrayList<>(128);