[agent][executor]- 优化command executor的代码

This commit is contained in:
zeaslity
2023-02-25 15:44:53 +08:00
parent 3bd38aaf51
commit 8187fe7baa
3 changed files with 69 additions and 31 deletions

View File

@@ -26,7 +26,7 @@
<env name="server.port" value="8001" /> <env name="server.port" value="8001" />
</envs> </envs>
<module name="agent" /> <module name="agent" />
<target name="root@10.250.0.18:22" /> <target name="UbuntuStation" />
<option name="SPRING_BOOT_MAIN_CLASS" value="io.wdd.agent.AgentApplication" /> <option name="SPRING_BOOT_MAIN_CLASS" value="io.wdd.agent.AgentApplication" />
<option name="VM_PARAMETERS" value="-Dserver.port=8001 -Dfile.encoding=utf-8 -Ddebug=true -Dlogging.level.io.wdd.agent=debug -Dspring.profiles.active=dev -Dspring.cloud.nacos.config.group=dev -Dspring.cloud.nacos.config.extension-configs[0].dataId=common-dev.yaml -Dspring.cloud.nacos.config.extension-configs[0].group=dev" /> <option name="VM_PARAMETERS" value="-Dserver.port=8001 -Dfile.encoding=utf-8 -Ddebug=true -Dlogging.level.io.wdd.agent=debug -Dspring.profiles.active=dev -Dspring.cloud.nacos.config.group=dev -Dspring.cloud.nacos.config.extension-configs[0].dataId=common-dev.yaml -Dspring.cloud.nacos.config.extension-configs[0].group=dev" />
<method v="2"> <method v="2">

View File

@@ -44,6 +44,12 @@ public class CommandExecutor {
*/ */
int processMaxWaitSeconds = 60; int processMaxWaitSeconds = 60;
/**
* 持久化命令执行的最长等待时间
* 8小时 8*60*60 =
*/
int durationTaskMaxWaitSeconds = 8 * 60 * 60;
/** /**
* handle command from octopus server * handle command from octopus server
* *
@@ -101,27 +107,31 @@ public class CommandExecutor {
Process process = processBuilder.start(); Process process = processBuilder.start();
// start a backend thread to daemon the process // 守护进程 对每一条任务的超时时间进行限制,避免卡死
// wait for processMaxWaitSeconds and kill the process if it's still alived int commandExecWaitTimeout = processMaxWaitSeconds;
if (!durationTask) { if (durationTask) {
// 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制 // 持久化任务的超时等待时间会非常长,但还是采用相同的机制
log.info("当前命令为持久化任务,将在 {} 秒之后返回日志",
durationTaskMaxWaitSeconds);
commandExecWaitTimeout = durationTaskMaxWaitSeconds;
}
// 缓存 命令处理日志
commandExecLogCache.cacheLog(
streamKey,
process
);
// 2023年2月23日 同时执行log日志部分移动至此部分处理 // 2023年2月23日 同时执行log日志部分移动至此部分处理
AgentCommonThreadPool.pool.submit( AgentCommonThreadPool.pool.submit(
DaemonCommandProcessAndCollectLog( DaemonCommandProcessAndCollectLog(
process, process,
processMaxWaitSeconds, commandExecWaitTimeout,
countDownLatch, countDownLatch,
needResultReplay, needResultReplay,
streamKey, streamKey,
octopusMessage octopusMessage
)); ));
}
// 缓存让命令处理日志,并且打印
commandExecLogCache.cacheLog(
streamKey,
process
);
// get the command result must also be a timeout smaller than the process // get the command result must also be a timeout smaller than the process
int waitFor = process.waitFor(); int waitFor = process.waitFor();
@@ -154,7 +164,7 @@ public class CommandExecutor {
return processResult; return processResult;
} }
private Runnable DaemonCommandProcessAndCollectLog(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) { private Runnable DaemonCommandProcessAndCollectLog(Process process, int commandExecWaitTimeout, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) {
return () -> { return () -> {
boolean commandExecComplete = false; boolean commandExecComplete = false;
@@ -162,12 +172,12 @@ public class CommandExecutor {
try { try {
log.debug( log.debug(
"命令执行守护进程开始等待 {} s for the result", "命令执行守护进程开始等待 {} s for the result",
processMaxWaitSeconds commandExecWaitTimeout
); );
// 使用 countDownLatch 进行超时等待 // 使用 countDownLatch 进行超时等待
commandExecComplete = countDownLatch.await( commandExecComplete = countDownLatch.await(
processMaxWaitSeconds, commandExecWaitTimeout,
TimeUnit.SECONDS TimeUnit.SECONDS
); );
@@ -184,12 +194,17 @@ public class CommandExecutor {
.info() .info()
.commandLine() .commandLine()
.get(), .get(),
processMaxWaitSeconds commandExecWaitTimeout
); );
} }
// 日志操作,如果需要显示回传,需要将日志发送回相应的 // 日志操作,如果需要显示回传,需要将日志发送回相应的
collectCommandLogAndRepeat(streamKey, needResultReplay, octopusMessage); collectCommandLogAndRepeat(
streamKey,
needResultReplay,
octopusMessage
);
// 只有当该进程还存活,执行关闭操作 // 只有当该进程还存活,执行关闭操作
if (process.isAlive()) { if (process.isAlive()) {
@@ -203,13 +218,30 @@ public class CommandExecutor {
}; };
} }
/**
* 日志回传操作
* 默认发送值Redis中
* 如果需要持久化则需要发送至RabbitMQ中
*
* @param streamKey
* @param needResultReplay
* @param octopusMessage
*/
private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) { private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 获取到命令执行的结果, 此操作会清除掉缓存 // 获取到命令执行的结果, 此操作会清除掉缓存
ArrayList<String> commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey); ArrayList<String> commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey);
log.debug(
"从缓存中获取到的命令执行日志为: {}",
commandExecLogCachedLog
);
// 简单的发送到StreamSender // 简单的发送到StreamSender
simpleStreamSender.sendLog(streamKey, commandExecLogCachedLog); simpleStreamSender.sendLog(
streamKey,
commandExecLogCachedLog
);
// 需要恢复相应的消息 // 需要恢复相应的消息
if (needResultReplay) { if (needResultReplay) {

View File

@@ -3,8 +3,8 @@ package io.wdd.agent.executor.thread;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
@@ -22,7 +22,7 @@ import java.util.HashMap;
public class CommandExecLogCache { public class CommandExecLogCache {
// 存储命令执行缓存日志 // 存储命令执行缓存日志
public static HashMap<String, ArrayList<String>> CachedCommandLog = new HashMap<>(); public static HashMap<String, ArrayList<String>> CachedCommandLogMap = new HashMap<>();
/** /**
* 存储命令执行为空的默认空结果 * 存储命令执行为空的默认空结果
@@ -52,13 +52,20 @@ public class CommandExecLogCache {
TimeUtils.currentTimeString() TimeUtils.currentTimeString()
); );
String execResultString = String.format(
"execution result is => [ %s ]",
process.exitValue()
);
// add the command // add the command
commandCachedLog.add("");
commandCachedLog.add(execCommandString); commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString); commandCachedLog.add(execTimeString);
commandCachedLog.add(execResultString);
commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add(""); commandCachedLog.add("");
Assert.notNull(process.getInputStream(),"命令执行结果输出为空!");
// cache the real command logs // cache the real command logs
doCacheLog( doCacheLog(
streamKey, streamKey,
@@ -78,10 +85,10 @@ public class CommandExecLogCache {
public ArrayList<String> getCacheLog(String streamKey) { public ArrayList<String> getCacheLog(String streamKey) {
// 获取 // 获取
ArrayList<String> execLogCacheArrayList = CachedCommandLog.getOrDefault(streamKey,EmptyCommandLog); ArrayList<String> execLogCacheArrayList = CachedCommandLogMap.getOrDefault(streamKey, EmptyCommandLog);
// 清除Key // 清除Key
CachedCommandLog.remove(streamKey); CachedCommandLogMap.remove(streamKey);
return execLogCacheArrayList; return execLogCacheArrayList;
} }
@@ -95,7 +102,6 @@ public class CommandExecLogCache {
*/ */
private void doCacheLog(String streamKey, InputStream commandLogStream, ArrayList<String> commandCachedLog) { private void doCacheLog(String streamKey, InputStream commandLogStream, ArrayList<String> commandCachedLog) {
// read from input stream and store to the cacheArrayList // read from input stream and store to the cacheArrayList
new BufferedReader(new InputStreamReader(commandLogStream)) new BufferedReader(new InputStreamReader(commandLogStream))
.lines() .lines()
@@ -104,7 +110,7 @@ public class CommandExecLogCache {
); );
log.debug( log.debug(
"current streamKey is {} and CacheLog is {}", "current streamKey is [ {} ] and CacheLog is [ {} ]",
streamKey, streamKey,
commandCachedLog commandCachedLog
); );
@@ -121,7 +127,7 @@ public class CommandExecLogCache {
int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey); int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey);
return CachedCommandLog.get(keyToIndex); return CachedCommandLogMap.get(keyToIndex);
} }
@@ -132,7 +138,7 @@ public class CommandExecLogCache {
* @return * @return
*/ */
private int hashStreamKeyToCachedArrayListIndex(String streamKey) { private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
int size = CachedCommandLog.size(); int size = CachedCommandLogMap.size();
return Math.abs(streamKey.hashCode() % size); return Math.abs(streamKey.hashCode() % size);
} }