diff --git a/.run/Agent-dev-1.run.xml b/.run/Agent-dev-1.run.xml
index f6486b5..cb65223 100644
--- a/.run/Agent-dev-1.run.xml
+++ b/.run/Agent-dev-1.run.xml
@@ -26,7 +26,7 @@
-
+
diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java
index 63e05d3..895c07b 100644
--- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java
+++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java
@@ -44,6 +44,12 @@ public class CommandExecutor {
*/
int processMaxWaitSeconds = 60;
+ /**
+ * 持久化命令执行的最长等待时间
+ * 8小时 8*60*60 =
+ */
+ int durationTaskMaxWaitSeconds = 8 * 60 * 60;
+
/**
* handle command from octopus server
*
@@ -101,28 +107,32 @@ public class CommandExecutor {
Process process = processBuilder.start();
- // start a backend thread to daemon the process
- // wait for processMaxWaitSeconds and kill the process if it's still alived
- if (!durationTask) {
- // 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制
- // 2023年2月23日 同时执行log日志部分移动至此部分处理
- AgentCommonThreadPool.pool.submit(
- DaemonCommandProcessAndCollectLog(
- process,
- processMaxWaitSeconds,
- countDownLatch,
- needResultReplay,
- streamKey,
- octopusMessage
- ));
+ // 守护进程 对每一条任务的超时时间进行限制,避免卡死
+ int commandExecWaitTimeout = processMaxWaitSeconds;
+ if (durationTask) {
+ // 持久化任务的超时等待时间会非常长,但还是采用相同的机制
+ log.info("当前命令为持久化任务,将在 {} 秒之后返回日志",
+ durationTaskMaxWaitSeconds);
+ commandExecWaitTimeout = durationTaskMaxWaitSeconds;
}
- // 缓存让命令处理日志,并且打印
+ // 缓存 命令处理日志
commandExecLogCache.cacheLog(
streamKey,
process
);
+ // 2023年2月23日 同时执行log日志部分移动至此部分处理
+ AgentCommonThreadPool.pool.submit(
+ DaemonCommandProcessAndCollectLog(
+ process,
+ commandExecWaitTimeout,
+ countDownLatch,
+ needResultReplay,
+ streamKey,
+ octopusMessage
+ ));
+
// get the command result must also be a timeout smaller than the process
int waitFor = process.waitFor();
@@ -154,7 +164,7 @@ public class CommandExecutor {
return processResult;
}
- private Runnable DaemonCommandProcessAndCollectLog(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) {
+ private Runnable DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) {
return () -> {
boolean commandExecComplete = false;
@@ -162,12 +172,12 @@ public class CommandExecutor {
try {
log.debug(
"命令执行守护进程开始等待 {} s for the result",
- processMaxWaitSeconds
+ commandExecWaitTimeout
);
// 使用 countDownLatch 进行超时等待
commandExecComplete = countDownLatch.await(
- processMaxWaitSeconds,
+ commandExecWaitTimeout,
TimeUnit.SECONDS
);
@@ -184,12 +194,17 @@ public class CommandExecutor {
.info()
.commandLine()
.get(),
- processMaxWaitSeconds
+ commandExecWaitTimeout
);
}
+
// 日志操作,如果需要显示回传,需要将日志发送回相应的
- collectCommandLogAndRepeat(streamKey, needResultReplay, octopusMessage);
+ collectCommandLogAndRepeat(
+ streamKey,
+ needResultReplay,
+ octopusMessage
+ );
// 只有当该进程还存活,执行关闭操作
if (process.isAlive()) {
@@ -203,13 +218,30 @@ public class CommandExecutor {
};
}
+ /**
+ * 日志回传操作
+ * 默认发送值Redis中
+ * 如果需要持久化,则需要发送至RabbitMQ中
+ *
+ * @param streamKey
+ * @param needResultReplay
+ * @param octopusMessage
+ */
private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 获取到命令执行的结果, 此操作会清除掉缓存
ArrayList commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey);
+ log.debug(
+ "从缓存中获取到的命令执行日志为: {}",
+ commandExecLogCachedLog
+ );
+
// 简单的发送到StreamSender
- simpleStreamSender.sendLog(streamKey, commandExecLogCachedLog);
+ simpleStreamSender.sendLog(
+ streamKey,
+ commandExecLogCachedLog
+ );
// 需要恢复相应的消息
if (needResultReplay) {
diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java
index 12f5b29..5bb8885 100644
--- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java
+++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java
@@ -3,8 +3,8 @@ package io.wdd.agent.executor.thread;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
import java.io.BufferedReader;
import java.io.InputStream;
@@ -22,7 +22,7 @@ import java.util.HashMap;
public class CommandExecLogCache {
// 存储命令执行缓存日志
- public static HashMap> CachedCommandLog = new HashMap<>();
+ public static HashMap> CachedCommandLogMap = new HashMap<>();
/**
* 存储命令执行为空的默认空结果
@@ -52,13 +52,20 @@ public class CommandExecLogCache {
TimeUtils.currentTimeString()
);
+ String execResultString = String.format(
+ "execution result is => [ %s ]",
+ process.exitValue()
+ );
+
// add the command
- commandCachedLog.add("");
commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString);
+ commandCachedLog.add(execResultString);
commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add("");
+ Assert.notNull(process.getInputStream(),"命令执行结果输出为空!");
+
// cache the real command logs
doCacheLog(
streamKey,
@@ -78,10 +85,10 @@ public class CommandExecLogCache {
public ArrayList getCacheLog(String streamKey) {
// 获取
- ArrayList execLogCacheArrayList = CachedCommandLog.getOrDefault(streamKey,EmptyCommandLog);
+ ArrayList execLogCacheArrayList = CachedCommandLogMap.getOrDefault(streamKey, EmptyCommandLog);
// 清除Key
- CachedCommandLog.remove(streamKey);
+ CachedCommandLogMap.remove(streamKey);
return execLogCacheArrayList;
}
@@ -95,7 +102,6 @@ public class CommandExecLogCache {
*/
private void doCacheLog(String streamKey, InputStream commandLogStream, ArrayList commandCachedLog) {
-
// read from input stream and store to the cacheArrayList
new BufferedReader(new InputStreamReader(commandLogStream))
.lines()
@@ -104,7 +110,7 @@ public class CommandExecLogCache {
);
log.debug(
- "current streamKey is {} and CacheLog is {}",
+ "current streamKey is [ {} ] and CacheLog is [ {} ]",
streamKey,
commandCachedLog
);
@@ -121,7 +127,7 @@ public class CommandExecLogCache {
int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey);
- return CachedCommandLog.get(keyToIndex);
+ return CachedCommandLogMap.get(keyToIndex);
}
@@ -132,7 +138,7 @@ public class CommandExecLogCache {
* @return
*/
private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
- int size = CachedCommandLog.size();
+ int size = CachedCommandLogMap.size();
return Math.abs(streamKey.hashCode() % size);
}