[ agent ] [ executor ]- 优化Agent执行Command部分的代码

This commit is contained in:
IceDerce
2023-02-23 22:02:52 +08:00
parent 28a26f8341
commit 3bd38aaf51
11 changed files with 159 additions and 110 deletions

View File

@@ -131,7 +131,8 @@ public class AgentRebootUpdateService {
streamKey,
shutdownCommand,
false,
false
false,
null
);
}
}

View File

@@ -51,7 +51,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
// add in 2023-1-17
if (CollectionUtils.isNotEmpty(executionMessage.getMultiLineCommand())) {
// 传递的是 页面定时任务脚本
functionExecutor.execute(executionMessage, true);
functionExecutor.execute(octopusMessage, executionMessage, true);
return true;
}
@@ -59,10 +59,10 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
String executionType = executionMessage.getType();
if (ALL_FUNCTION_MAP.containsKey(executionType)) {
// execute the exist function
functionExecutor.execute(executionMessage, false);
functionExecutor.execute(octopusMessage, executionMessage, false);
} else {
// handle command
commandExecutor.execute(executionMessage);
commandExecutor.execute(octopusMessage, executionMessage);
}
} catch (IOException e) {

View File

@@ -2,9 +2,13 @@ package io.wdd.agent.executor;
import com.google.common.io.ByteStreams;
import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.redis.StreamSender;
import io.wdd.agent.executor.reply.SimpleStreamSender;
import io.wdd.agent.executor.reply.StreamSender;
import io.wdd.agent.executor.thread.CommandExecLogCache;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.utils.TimeUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
@@ -14,6 +18,7 @@ import javax.annotation.Resource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -25,10 +30,18 @@ public class CommandExecutor {
@Resource
StreamSender streamSender;
@Resource
SimpleStreamSender simpleStreamSender;
@Resource
CommandExecLogCache commandExecLogCache;
@Resource
OMessageToServerSender toServerSender;
/**
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 60;
/**
@@ -36,7 +49,7 @@ public class CommandExecutor {
*
* @param executionMessage get from EXECUTOR_HANDLER
*/
public void execute(ExecutionMessage executionMessage) {
public void execute(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
// todo 需要长时间执行的任务 与目前的系统设计存在冲突 防卡死后台进程`出现问题
// 防止阻塞消息队列中的其他信息,需要使用异步执行
@@ -45,13 +58,14 @@ public class CommandExecutor {
executionMessage.getResultKey(),
executionMessage.getSingleLineCommand(),
executionMessage.isNeedResultReplay(),
executionMessage.isDurationTask()
executionMessage.isDurationTask(),
octopusMessage
)
);
}
public int execute(String streamKey, List<String> command, boolean needResultReplay, boolean durationTask) {
public int execute(String streamKey, List<String> command, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) {
ProcessBuilder processBuilder = new ProcessBuilder(command);
@@ -59,28 +73,15 @@ public class CommandExecutor {
streamKey,
processBuilder,
needResultReplay,
durationTask
durationTask,
octopusMessage
);
}
public int execute(String streamKey, boolean needResultReplay, boolean durationTask, String... command) {
ProcessBuilder processBuilder = new ProcessBuilder(command);
return this.processExecute(
streamKey,
processBuilder,
needResultReplay,
durationTask
);
}
/**
* 执行最底层命令行操作的代码
*/
private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask) {
private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) {
// 重定向,错误日志到标准输出中
processBuilder.redirectErrorStream(true);
@@ -106,10 +107,13 @@ public class CommandExecutor {
// 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制
// 2023年2月23日 同时执行log日志部分移动至此部分处理
AgentCommonThreadPool.pool.submit(
StopStuckCommandProcess(
DaemonCommandProcessAndCollectLog(
process,
processMaxWaitSeconds,
countDownLatch
countDownLatch,
needResultReplay,
streamKey,
octopusMessage
));
}
@@ -119,15 +123,9 @@ public class CommandExecutor {
process
);
// start to send the result log
//streamSender.startToWaitLog(streamKey);
// get the command result must also be a timeout smaller than the process
int waitFor = process.waitFor();
// end send logs
//streamSender.endWaitLog(streamKey);
// get the process result
if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) {
@@ -156,7 +154,7 @@ public class CommandExecutor {
return processResult;
}
private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch) {
private Runnable DaemonCommandProcessAndCollectLog(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch, boolean needResultReplay, String streamKey, OctopusMessage octopusMessage) {
return () -> {
boolean commandExecComplete = false;
@@ -190,6 +188,9 @@ public class CommandExecutor {
);
}
// 日志操作,如果需要显示回传,需要将日志发送回相应的
collectCommandLogAndRepeat(streamKey, needResultReplay, octopusMessage);
// 只有当该进程还存活,执行关闭操作
if (process.isAlive()) {
// shutdown the process
@@ -202,6 +203,29 @@ public class CommandExecutor {
};
}
private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 获取到命令执行的结果, 此操作会清除掉缓存
ArrayList<String> commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey);
// 简单的发送到StreamSender
simpleStreamSender.sendLog(streamKey, commandExecLogCachedLog);
// 需要恢复相应的消息
if (needResultReplay) {
log.debug("需要准确回复执行命令结果");
// 构造相应的数据
octopusMessage.setAc_time(TimeUtils.currentFormatTime());
octopusMessage.setResult(commandExecLogCachedLog);
// 发送返回执行完成的 OM结果
toServerSender.send(octopusMessage);
}
}
@Deprecated
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {
@@ -224,6 +248,7 @@ public class CommandExecutor {
@SneakyThrows
@Deprecated
public void clearCommandCache(String streamKey) {
// wait

View File

@@ -5,6 +5,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.config.ExecutorFunctionNacosCollector;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
@@ -29,7 +30,7 @@ public class FunctionExecutor {
ExecutorFunctionNacosCollector executorFunctionNacosCollector;
public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) {
public void execute(OctopusMessage octopusMessage, ExecutionMessage executionMessage, boolean isScheduledScript) {
// 拿到 resultKey
String resultKey = executionMessage.getResultKey();
@@ -49,14 +50,15 @@ public class FunctionExecutor {
resultKey,
multiLineCommand,
executionMessage.isNeedResultReplay(),
executionMessage.isDurationTask()
executionMessage.isDurationTask(),
octopusMessage
)
);
}
private void doExecuteFunction(String streamKey, List<List<String>> multiLineCommand, boolean needResultReplay, boolean durationTask) {
private void doExecuteFunction(String streamKey, List<List<String>> multiLineCommand, boolean needResultReplay, boolean durationTask, OctopusMessage octopusMessage) {
log.info(
"[ Function Executor ] all commands are ==> {}",
@@ -71,7 +73,8 @@ public class FunctionExecutor {
streamKey,
iterator.next(),
needResultReplay,
durationTask
durationTask,
octopusMessage
);
if (execute != 0) {
@@ -80,11 +83,12 @@ public class FunctionExecutor {
}
}
// 清除命令行的执行日志缓存
commandExecutor.clearCommandCache(streamKey);
}
/**
* 后台监听Nacos的配置实时跟新从Nacos中修改过的 函数命令脚本的内容
*/
@Bean
private void daemonListenToNacosFunctions() {
@@ -107,8 +111,8 @@ public class FunctionExecutor {
s
);
// 解析获取到的功能脚本的字符串,然后解析为缓存
executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s);
}
}
);

View File

@@ -0,0 +1,71 @@
package io.wdd.agent.executor.reply;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* 简单的发送缓存的命令执行Log到Redis的StreamKey中
*/
@Service
@Slf4j
public class SimpleStreamSender {
@Resource
RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
/**
* 根据StreamKey 发送命令日志到Redis中
*
* @param streamKey 返回结果的Key
* @return
*/
public boolean sendLog(String streamKey, ArrayList<String> commandExecLogCachedLog) {
return this.send(streamKey, commandExecLogCachedLog);
}
private boolean send(String streamKey, List<String> content) {
try {
String resultContent = objectMapper.writeValueAsString(content);
// 构造Stream类型的数据结构然后进行传输
HashMap<String, String> map = new HashMap<>();
map.put(TimeUtils.currentTimeString(), resultContent);
log.debug("redis stream sender message is {}", map);
StringRecord stringRecord = StreamRecords
.string(map).withStreamKey(streamKey);
// 发送内容到Redis中
RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
// 返回结果
return ObjectUtils.isNotEmpty(recordId);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,4 +1,4 @@
package io.wdd.agent.executor.redis;
package io.wdd.agent.executor.reply;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -48,6 +48,7 @@ public class StreamSender {
@SneakyThrows
@Deprecated
private static Map generateFakeData() {
String random = RandomStringUtils.random(16);
CommandLog commandLog = new CommandLog();

View File

@@ -1,6 +1,7 @@
package io.wdd.agent.executor.thread;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
@@ -46,9 +47,15 @@ public class CommandExecLogCache {
.get()
);
String execTimeString = String.format(
"execution time is => [ %s ]",
TimeUtils.currentTimeString()
);
// add the command
commandCachedLog.add("");
commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString);
commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add("");
@@ -76,7 +83,6 @@ public class CommandExecLogCache {
// 清除Key
CachedCommandLog.remove(streamKey);
return execLogCacheArrayList;
}

View File

@@ -1,32 +0,0 @@
package io.wdd.agent.executor.thread;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.wdd.common.beans.response.R;
import java.util.concurrent.*;
//public class DaemonLogThread {
//
// private static final ExecutorService executorService;
//
// static {
//
// ThreadFactory daemonLogThread = new ThreadFactoryBuilder()
// .setDaemon(true)
// .setNameFormat("BackendToRedisThread")
// .setPriority(1)
// .build();
//
// executorService = Executors.newSingleThreadExecutor(daemonLogThread);
//
// }
//
// public static Future<?> start(Runnable backendToRedisStream) {
//
// return executorService.submit(backendToRedisStream);
// }
//
// public static void stop(Runnable backendToRedisStream) {
// executorService.shutdownNow();
// }
//}

View File

@@ -50,7 +50,7 @@ public class TestCommandExecutorController {
System.out.println("executionMessage = " + executionMessage);
functionExecutor.execute(executionMessage, false);
functionExecutor.execute(null, executionMessage, false);
return R.ok(streamKey);
}

View File

@@ -32,14 +32,16 @@ public class OMessageToServerSender {
RabbitTemplate rabbitTemplate;
/**
* 发送消息至MQ中 队列为 OCTOPUS_TO_SERVER 信息需要有Server自行处理
*
* @param octopusMessage 需要发送的OM信息
* @return
*/
public boolean send(OctopusMessage octopusMessage) {
octopusMessage.setAc_time(LocalDateTime.now());
// send to Queue -- InitToServer
log.info("send Message to Server = {}", octopusMessage);
log.debug("send Message to Server = {}", octopusMessage);
try {
@@ -52,7 +54,7 @@ public class OMessageToServerSender {
} catch (JsonProcessingException e) {
log.error("Failed to send message to Serv er ! = {}", octopusMessage);
log.error("Failed to send message to Server ! = {}", octopusMessage);
throw new MyRuntimeException(e);
}

View File

@@ -1,29 +0,0 @@
package io.wdd.agent;
import io.wdd.agent.executor.CommandExecutor;
import javax.annotation.Resource;
//@SpringBootTest
public class InitRabbitMQTest {
@Resource
CommandExecutor commandExecutor;
// @Test
void testInitSendInfo() {
String homeDirectory = System.getProperty("user.home");
String format = String.format("C:\\program files\\powershell\\7\\pwsh.exe /c dir %s | findstr \"Desktop\"", homeDirectory);
commandExecutor.execute("sasda",
false,
false,
"C:\\program files\\powershell\\7\\pwsh.exe",
"pwd");
}
}