[agent][executor]- 优化command executor的代码

This commit is contained in:
zeaslity
2023-02-23 21:12:11 +08:00
parent 24033bacc1
commit 28a26f8341
13 changed files with 330 additions and 151 deletions

View File

@@ -129,7 +129,9 @@ public class AgentRebootUpdateService {
// 最终执行关机操作 // 最终执行关机操作
commandExecutor.execute( commandExecutor.execute(
streamKey, streamKey,
shutdownCommand shutdownCommand,
false,
false
); );
} }
} }

View File

@@ -1,4 +1,4 @@
package io.wdd.agent.config.message.config; package io.wdd.agent.config.message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;

View File

@@ -49,7 +49,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
// add in 2023-1-17 // add in 2023-1-17
if (CollectionUtils.isNotEmpty(executionMessage.getScriptCommandList())) { if (CollectionUtils.isNotEmpty(executionMessage.getMultiLineCommand())) {
// 传递的是 页面定时任务脚本 // 传递的是 页面定时任务脚本
functionExecutor.execute(executionMessage, true); functionExecutor.execute(executionMessage, true);
return true; return true;

View File

@@ -3,7 +3,7 @@ package io.wdd.agent.executor;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.redis.StreamSender; import io.wdd.agent.executor.redis.StreamSender;
import io.wdd.agent.executor.thread.LogToArrayListCache; import io.wdd.agent.executor.thread.CommandExecLogCache;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -15,8 +15,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -28,12 +27,10 @@ public class CommandExecutor {
StreamSender streamSender; StreamSender streamSender;
@Resource @Resource
LogToArrayListCache logToArrayListCache; CommandExecLogCache commandExecLogCache;
int processMaxWaitSeconds = 60; int processMaxWaitSeconds = 60;
ExecutorService DaemonCommandProcess = Executors.newFixedThreadPool(1);
/** /**
* handle command from octopus server * handle command from octopus server
* *
@@ -46,43 +43,52 @@ public class CommandExecutor {
AgentCommonThreadPool.pool.submit( AgentCommonThreadPool.pool.submit(
() -> this.execute( () -> this.execute(
executionMessage.getResultKey(), executionMessage.getResultKey(),
executionMessage.getCommandList() executionMessage.getSingleLineCommand(),
executionMessage.isNeedResultReplay(),
executionMessage.isDurationTask()
) )
); );
} }
public int execute(String streamKey, List<String> command) { public int execute(String streamKey, List<String> command, boolean needResultReplay, boolean durationTask) {
ProcessBuilder processBuilder = new ProcessBuilder(command); ProcessBuilder processBuilder = new ProcessBuilder(command);
return this.processExecute( return this.processExecute(
streamKey, streamKey,
processBuilder processBuilder,
needResultReplay,
durationTask
); );
} }
public int execute(String streamKey, String... command) { public int execute(String streamKey, boolean needResultReplay, boolean durationTask, String... command) {
ProcessBuilder processBuilder = new ProcessBuilder(command); ProcessBuilder processBuilder = new ProcessBuilder(command);
return this.processExecute( return this.processExecute(
streamKey, streamKey,
processBuilder processBuilder,
needResultReplay,
durationTask
); );
} }
private int processExecute(String streamKey, ProcessBuilder processBuilder) { /**
* 执行最底层命令行操作的代码
*/
private int processExecute(String streamKey, ProcessBuilder processBuilder, boolean needResultReplay, boolean durationTask) {
// 重定向,错误日志到标准输出中 // 重定向,错误日志到标准输出中
processBuilder.redirectErrorStream(true); processBuilder.redirectErrorStream(true);
//processBuilder.inheritIO(); // processBuilder.inheritIO();
//processBuilder.directory(new File(System.getProperty("user.home"))); // processBuilder.directory(new File(System.getProperty("user.home")));
int processResult = 233; int processResult = 233;
try { try {
// 开始执行命令之前,需要进行打印 // 开始执行命令之前,需要进行打印
log.debug( log.debug(
"current shell command {}", "current shell command {}",
@@ -90,19 +96,25 @@ public class CommandExecutor {
); );
// 开始执行命令操作 // 开始执行命令操作
CountDownLatch countDownLatch = new CountDownLatch(1);
Process process = processBuilder.start(); Process process = processBuilder.start();
// start a backend thread to daemon the process // start a backend thread to daemon the process
// wait for processMaxWaitSeconds and kill the process if it's still alived // wait for processMaxWaitSeconds and kill the process if it's still alived
DaemonCommandProcess.submit( if (!durationTask) {
// 不是持久化任务,那么需要为每一条命令执行,设置超时保护机制
// 2023年2月23日 同时执行log日志部分移动至此部分处理
AgentCommonThreadPool.pool.submit(
StopStuckCommandProcess( StopStuckCommandProcess(
process, process,
processMaxWaitSeconds processMaxWaitSeconds,
countDownLatch
)); ));
}
// 缓存让命令处理日志,并且打印 // 缓存让命令处理日志,并且打印
logToArrayListCache.cacheLog( commandExecLogCache.cacheLog(
streamKey, streamKey,
process process
); );
@@ -110,21 +122,21 @@ public class CommandExecutor {
// start to send the result log // start to send the result log
//streamSender.startToWaitLog(streamKey); //streamSender.startToWaitLog(streamKey);
// todo this will stuck the process and rabbitmq message will reentry the queue
// get the command result must also be a timeout smaller than the process // get the command result must also be a timeout smaller than the process
boolean waitFor = process.waitFor( int waitFor = process.waitFor();
50,
TimeUnit.SECONDS
);
// end send logs // end send logs
//streamSender.endWaitLog(streamKey); //streamSender.endWaitLog(streamKey);
// get the process result // get the process result
if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) { if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) {
// 命令执行完成, countDownLatch计数
countDownLatch.countDown();
// 设置 命令执行退出返回值
processResult = process.exitValue(); processResult = process.exitValue();
} }
log.debug( log.debug(
@@ -133,7 +145,6 @@ public class CommandExecutor {
processResult processResult
); );
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
log.error( log.error(
"Shell command error ! {} + {}", "Shell command error ! {} + {}",
@@ -145,38 +156,53 @@ public class CommandExecutor {
return processResult; return processResult;
} }
private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds) { private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds, CountDownLatch countDownLatch) {
return () -> { return () -> {
try {
boolean commandExecComplete = false;
try {
log.debug( log.debug(
"daemon thread start to wait for {} s for the result", "命令执行守护进程开始等待 {} s for the result",
processMaxWaitSeconds processMaxWaitSeconds
); );
TimeUnit.SECONDS.sleep(processMaxWaitSeconds); // 使用 countDownLatch 进行超时等待
commandExecComplete = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
if (process.isAlive()) { } catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成
if (!commandExecComplete) {
log.warn( log.warn(
"Command [ {} ] stuck for {} s, destroy the command process !", "Command [ {} ] stuck for {} s, destroy the command process!",
process process
.info() .info()
.commandLine() .commandLine()
.get(), .get(),
processMaxWaitSeconds processMaxWaitSeconds
); );
// shutdown the process
process.destroyForcibly();
} }
} catch (InterruptedException e) { // 只有当该进程还存活,执行关闭操作
throw new RuntimeException(e); if (process.isAlive()) {
// shutdown the process
process.destroyForcibly();
}
} }
}; };
} }
@Deprecated
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {
byte[] toByteArray = ByteStreams.toByteArray(inputStream); byte[] toByteArray = ByteStreams.toByteArray(inputStream);
@@ -204,8 +230,8 @@ public class CommandExecutor {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
// clear the log Cache Thread scope // clear the log Cache Thread scope
logToArrayListCache commandExecLogCache
.getExecutionCmdCachedLogArrayList(streamKey) .getCommandExecLogCacheArrayList(streamKey)
.clear(); .clear();
// clear the stream sender // clear the stream sender

View File

@@ -29,39 +29,50 @@ public class FunctionExecutor {
ExecutorFunctionNacosCollector executorFunctionNacosCollector; ExecutorFunctionNacosCollector executorFunctionNacosCollector;
public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) { public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) {
// 拿到 resultKey // 拿到 resultKey
String resultKey = executionMessage.getResultKey(); String resultKey = executionMessage.getResultKey();
List<List<String>> completeCommandList; List<List<String>> multiLineCommand;
if (isScheduledScript) { if (isScheduledScript) {
// 检测到是 页面定时任务脚本 // 检测到是 页面定时任务脚本
completeCommandList = executionMessage.getScriptCommandList(); multiLineCommand = executionMessage.getMultiLineCommand();
} else { } else {
// 从 ALL_FUNCTION_MAP本地容器中(Nacos配置中获取)获取到 功能脚本的内容 // 从 ALL_FUNCTION_MAP本地容器中(Nacos配置中获取)获取到 功能脚本的内容
completeCommandList = ALL_FUNCTION_MAP.get(executionMessage.getType()); multiLineCommand = ALL_FUNCTION_MAP.get(executionMessage.getType());
} }
// 防止阻塞消息队列中的其他信息,需要使用异步执行 // 防止阻塞消息队列中的其他信息,需要使用异步执行
AgentCommonThreadPool.pool.submit( AgentCommonThreadPool.pool.submit(
() -> this.execute(resultKey, completeCommandList) () -> this.doExecuteFunction(
resultKey,
multiLineCommand,
executionMessage.isNeedResultReplay(),
executionMessage.isDurationTask()
)
); );
} }
private void execute(String streamKey, List<List<String>> completeCommandList) { private void doExecuteFunction(String streamKey, List<List<String>> multiLineCommand, boolean needResultReplay, boolean durationTask) {
log.info(
"[ Function Executor ] all commands are ==> {}",
multiLineCommand
);
log.info("[ Function Executor ] all commands are ==> {}", completeCommandList); Iterator<List<String>> iterator = multiLineCommand.iterator();
Iterator<List<String>> iterator = completeCommandList.iterator();
// 此处的策略为,将所有的命令是为一个整体,必须完整执行才可,完整返回结果
while (iterator.hasNext()) { while (iterator.hasNext()) {
int execute = commandExecutor.execute(streamKey, iterator.next()); int execute = commandExecutor.execute(
streamKey,
iterator.next(),
needResultReplay,
durationTask
);
if (execute != 0) { if (execute != 0) {
log.error("command list execute failed !"); log.error("command list execute failed !");
@@ -69,16 +80,20 @@ public class FunctionExecutor {
} }
} }
// 清除命令行的执行日志缓存
commandExecutor.clearCommandCache(streamKey); commandExecutor.clearCommandCache(streamKey);
} }
@Bean @Bean
private void daemonListenToNacosFunctions(){ private void daemonListenToNacosFunctions() {
// add listener to listen to the real-time change of the Function Shell Scripts // add listener to listen to the real-time change of the Function Shell Scripts
try { try {
NacosConfigService.addListener(executorFunctionNacosCollector.executorFunctionDataId + "." + executorFunctionNacosCollector.fileExtension, executorFunctionNacosCollector.group, new Listener() { NacosConfigService.addListener(
executorFunctionNacosCollector.executorFunctionDataId + "." + executorFunctionNacosCollector.fileExtension,
executorFunctionNacosCollector.group,
new Listener() {
@Override @Override
public Executor getExecutor() { public Executor getExecutor() {
return null; return null;
@@ -87,12 +102,16 @@ public class FunctionExecutor {
@Override @Override
public void receiveConfigInfo(String s) { public void receiveConfigInfo(String s) {
log.info("detected nacos function shell update ! {}", s); log.info(
"detected nacos function shell update ! {}",
s
);
executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s); executorFunctionNacosCollector.parseNacosFunctionYamlToMap(s);
} }
}); }
);
} catch (NacosException e) { } catch (NacosException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.config.beans.executor.CommandLog; import io.wdd.agent.config.beans.executor.CommandLog;
import io.wdd.agent.config.beans.executor.StreamSenderEntity; import io.wdd.agent.config.beans.executor.StreamSenderEntity;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.agent.executor.thread.LogToArrayListCache; import io.wdd.agent.executor.thread.CommandExecLogCache;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
@@ -39,7 +39,7 @@ public class StreamSender {
ObjectMapper objectMapper; ObjectMapper objectMapper;
@Resource @Resource
LogToArrayListCache logToArrayListCache; CommandExecLogCache commandExecLogCache;
private final HashMap<String, StreamSenderEntity> AllNeededStreamSender = new HashMap<>(16); private final HashMap<String, StreamSenderEntity> AllNeededStreamSender = new HashMap<>(16);
@@ -63,7 +63,7 @@ public class StreamSender {
StreamSenderEntity streamSender = StreamSenderEntity StreamSenderEntity streamSender = StreamSenderEntity
.builder() .builder()
.cachedCommandLog(logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey)) .cachedCommandLog(commandExecLogCache.getCommandExecLogCacheArrayList(streamKey))
.waitToSendLog(true) .waitToSendLog(true)
.startIndex(0) .startIndex(0)
.streamKey(streamKey) .streamKey(streamKey)

View File

@@ -9,32 +9,36 @@ import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.HashMap;
/** /**
* utils to cache store the command execution logs * utils to cache store the command execution logs
* 2023年2月22日 同样需要取出存着的日志
*/ */
@Component @Component
@Slf4j @Slf4j
public class LogToArrayListCache { public class CommandExecLogCache {
// concurrent command execute logs // 存储命令执行缓存日志
public static List<ArrayList<String>> CachedCommandLog = new ArrayList<>( public static HashMap<String, ArrayList<String>> CachedCommandLog = new HashMap<>();
List.of(
new ArrayList<>(256),
new ArrayList<>(256),
new ArrayList<>(256),
new ArrayList<>(256),
new ArrayList<>(256)
)
);
/**
* 存储命令执行为空的默认空结果
*/
private static ArrayList<String> EmptyCommandLog = new ArrayList<>();
/**
* 缓存命令执行日志
*
* @param streamKey 根据 命令执行结果Key 定位缓存队列
* @param process 植物执行队列
*/
public void cacheLog(String streamKey, Process process) { public void cacheLog(String streamKey, Process process) {
ArrayList<String> commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); ArrayList<String> commandCachedLog = new ArrayList<>(128);
String format = String.format( String execCommandString = String.format(
"execution command are => [ %s ]", "execution command are => [ %s ]",
process process
.info() .info()
@@ -44,21 +48,47 @@ public class LogToArrayListCache {
// add the command // add the command
commandCachedLog.add(""); commandCachedLog.add("");
commandCachedLog.add(format); commandCachedLog.add(execCommandString);
commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add(""); commandCachedLog.add("");
// cache the real command logs // cache the real command logs
doCacheLog( doCacheLog(
streamKey, streamKey,
process.getInputStream() process.getInputStream(),
commandCachedLog
); );
} }
private void doCacheLog(String streamKey, InputStream commandLogStream) {
ArrayList<String> commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); /**
* 获取缓存的 命令执行缓存日志
*
* @param streamKey
* @return
*/
public ArrayList<String> getCacheLog(String streamKey) {
// 获取
ArrayList<String> execLogCacheArrayList = CachedCommandLog.getOrDefault(streamKey,EmptyCommandLog);
// 清除Key
CachedCommandLog.remove(streamKey);
return execLogCacheArrayList;
}
/**
* 实际执行命令缓存操作
*
* @param streamKey
* @param commandLogStream
* @param commandCachedLog
*/
private void doCacheLog(String streamKey, InputStream commandLogStream, ArrayList<String> commandCachedLog) {
// read from input stream and store to the cacheArrayList // read from input stream and store to the cacheArrayList
new BufferedReader(new InputStreamReader(commandLogStream)) new BufferedReader(new InputStreamReader(commandLogStream))
@@ -74,7 +104,14 @@ public class LogToArrayListCache {
); );
} }
public ArrayList<String> getExecutionCmdCachedLogArrayList(String streamKey) { /**
* 获取到命令执行日志 缓存存储实际的ArrayList
*
* @param streamKey
* @return
*/
@Deprecated
public ArrayList<String> getCommandExecLogCacheArrayList(String streamKey) {
int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey); int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey);
@@ -82,15 +119,23 @@ public class LogToArrayListCache {
} }
/**
* 根据返回的ResultKey计算出来缓存的位置
*
* @param streamKey
* @return
*/
private int hashStreamKeyToCachedArrayListIndex(String streamKey) { private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
int size = CachedCommandLog.size(); int size = CachedCommandLog.size();
return Math.abs(streamKey.hashCode() % size); return Math.abs(streamKey.hashCode() % size);
} }
@Deprecated
private int hashStreamKeyToCachedArrayListIndexWithProblem(String streamKey) { private int hashStreamKeyToCachedArrayListIndexWithProblem(String streamKey) {
int size = CachedCommandLog.size(); return 0;
/*int size = CachedCommandLog.size();
int result = Math.abs(streamKey.hashCode() % size); int result = Math.abs(streamKey.hashCode() % size);
boolean hasRehashed = false; boolean hasRehashed = false;
@@ -123,7 +168,7 @@ public class LogToArrayListCache {
} }
} }
return result; return result;*/
} }
} }

View File

@@ -25,7 +25,7 @@ public class TestCommandExecutorController {
FunctionExecutor functionExecutor; FunctionExecutor functionExecutor;
@PostMapping("comand") /*@PostMapping("comand")
public R<String> testFor( public R<String> testFor(
@RequestParam(value = "streamKey") String streamKey, @RequestParam(value = "streamKey") String streamKey,
@RequestParam(value = "command") List<String> command @RequestParam(value = "command") List<String> command
@@ -33,7 +33,7 @@ public class TestCommandExecutorController {
commandExecutor.execute(streamKey, command); commandExecutor.execute(streamKey, command);
return R.ok(streamKey); return R.ok(streamKey);
} }*/
@PostMapping("linuxFile") @PostMapping("linuxFile")
@@ -45,7 +45,7 @@ public class TestCommandExecutorController {
ExecutionMessage executionMessage = ExecutionMessage.builder() ExecutionMessage executionMessage = ExecutionMessage.builder()
.resultKey(streamKey) .resultKey(streamKey)
.type(messageType) .type(messageType)
.commandList(Collections.singletonList(messageType)) .singleLineCommand(Collections.singletonList(messageType))
.build(); .build();
System.out.println("executionMessage = " + executionMessage); System.out.println("executionMessage = " + executionMessage);

View File

@@ -19,6 +19,8 @@ public class InitRabbitMQTest {
String format = String.format("C:\\program files\\powershell\\7\\pwsh.exe /c dir %s | findstr \"Desktop\"", homeDirectory); String format = String.format("C:\\program files\\powershell\\7\\pwsh.exe /c dir %s | findstr \"Desktop\"", homeDirectory);
commandExecutor.execute("sasda", commandExecutor.execute("sasda",
false,
false,
"C:\\program files\\powershell\\7\\pwsh.exe", "C:\\program files\\powershell\\7\\pwsh.exe",
"pwd"); "pwd");

View File

@@ -1,5 +1,6 @@
package io.wdd.common.beans.executor; package io.wdd.common.beans.executor;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@@ -15,26 +16,59 @@ import java.util.List;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
public class ExecutionMessage { public class ExecutionMessage {
/**
* 2023年2月22日
* 是否需要返回 命令行的处理调用结果
* 通过 MQ返回
*/
@JsonProperty(defaultValue = "false")
boolean needResultReplay;
/**
* 2023年2月22日
* 是否是长时间持续执行任务
*/
@JsonProperty(defaultValue = "false")
boolean durationTask;
/** /**
* 用于区分 ExecutionMessage的类型 * 用于区分 ExecutionMessage的类型
* 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot
*/ */
private String type; private String type;
private List<String> commandList; /**
* 只有一行的命令行
*/
private List<String> singleLineCommand;
/** /**
* add in 2023-1-17 * add in 2023-1-17
* 页面定时脚本任务 需要传递完整的命令列表 * 页面定时脚本任务 需要传递完整的命令列表
*/ */
private List<List<String>> scriptCommandList; private List<List<String>> multiLineCommand;
/**
* 词条执行命令的返回结果在Redis中的ResultKey
*/
private String resultKey; private String resultKey;
/**
* 生成 Command结果的 resultKey
*
* @param topicName
* @return
*/
public static String GetResultKey(String topicName) { public static String GetResultKey(String topicName) {
return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit(); return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit();
} }
/**
* 延迟执行任务执行的Key为未来的生成这个和Key
*
* @param topicName
* @param futureExecutionTime
* @return
*/
public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) { public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) {
return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime); return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime);
} }

View File

@@ -9,6 +9,7 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@Deprecated
public class ExecutorFunctionMessage { public class ExecutorFunctionMessage {
String functionName; String functionName;

View File

@@ -10,34 +10,37 @@ public interface CoreExecutionService {
String SendCommandToAgent(String agentTopicName, List<String> commandList); String SendCommandToAgent(String agentTopicName, List<String> commandList);
/**
* 调用 单行命令脚本的 最底层函数
*
* @param agentTopicName agent唯一表示名
* @param type 任务执行类型
* @param commandList 任务列表内容
* @return redis中的 result key
*/
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList); String SendCommandToAgent(String agentTopicName, String type, List<String> commandList);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete); List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
/** /**
* * 调用 单行命令脚本的 最底层函数
* 调用 完整脚本的 最底层函数
* *
* @param agentTopicName * @param agentTopicName
* @param type * @param type
* @param commandList * @param commandList
* @param commandListComplete * @param needResultReplay
* @param futureKey * @param futureKey
* @return resultKey 本次操作在Redis中记录的结果Key * @param durationTask
* @return
*/ */
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey); String SendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command); /** ------------------------------------------------- */
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
/** /**
* 通常为 页面定时脚本任务调用 * 通常为 页面定时脚本任务调用
@@ -62,13 +65,28 @@ public interface CoreExecutionService {
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey); List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/** /**
* 向所有的Agent发送命令 * 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type * @param type
* @param commandList * @param commandList
* @return * @param commandListComplete
* @param futureKey
* @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key
*/ */
@Deprecated String SendCommandToAgent(
List<String> SendCommandToAgentAll(String type, List<String> commandList); String agentTopicName,
String type,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
} }

View File

@@ -61,11 +61,27 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
null false,
null,
false
); );
} }
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SendCommandToAgent(
agentTopicName,
type,
commandList,
null,
needResultReplay,
futureKey,
durationTask
);
}
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
@@ -81,6 +97,21 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return this.SendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
futureKey,
false
);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在 // 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error( log.error(
@@ -92,7 +123,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
} }
// 归一化type类型 不行 // 归一化type类型 不行
String resultKey = futureKey; String resultKey = futureKey;
// 判定是否是 FutureKey // 判定是否是 FutureKey
if (null == futureKey) { if (null == futureKey) {
@@ -105,7 +135,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
type, type,
commandList, commandList,
resultKey, resultKey,
commandListComplete commandListComplete,
needResultReplay,
durationTask
); );
OctopusMessage octopusMessage = this.generateOctopusMessage( OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName, agentTopicName,
@@ -122,6 +154,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
resultKey, resultKey,
resultKey resultKey
); );
log.debug( log.debug(
"set consumer group [{}] for the stream key with => [ {} ]", "set consumer group [{}] for the stream key with => [ {} ]",
group, group,
@@ -132,10 +165,10 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean // construct the persistent Bean
ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( /*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(
octopusMessage, octopusMessage,
executionMessage executionMessage
); );*/
// send resultKey to ExecutionResultDaemonHandler // send resultKey to ExecutionResultDaemonHandler
// 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效 // 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效
/*WAIT_EXECUTION_RESULT_LIST.put( /*WAIT_EXECUTION_RESULT_LIST.put(
@@ -148,6 +181,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
octopusMessage = null; octopusMessage = null;
return resultKey; return resultKey;
} }
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
@@ -174,7 +208,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
ExecutionLog executionLog = new ExecutionLog(); ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid()); executionLog.setAgentTopicName(octopusMessage.getUuid());
executionLog.setResultKey((String) octopusMessage.getContent()); executionLog.setResultKey((String) octopusMessage.getContent());
executionLog.setCommandList(String.valueOf(executionMessage.getCommandList())); executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand()));
executionLog.setType(executionMessage.getType()); executionLog.setType(executionMessage.getType());
executionLog.setResultKey(executionMessage.getResultKey()); executionLog.setResultKey(executionMessage.getResultKey());
return executionLog; return executionLog;
@@ -238,12 +272,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public List<String> SendCommandToAgentAll(String type, List<String> commandList) {
return null;
}
@Deprecated @Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) { private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {
@@ -253,7 +281,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
type, type,
commandList, commandList,
resultKey, resultKey,
commandListComplete commandListComplete,
false,
false
); );
String executionMessageString; String executionMessageString;
@@ -274,14 +304,16 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
.build(); .build();
} }
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete) { private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage return ExecutionMessage
.builder() .builder()
.resultKey(resultKey) .resultKey(resultKey)
.type(type) .type(type)
.commandList(commandList) .singleLineCommand(commandList)
.scriptCommandList(commandListComplete) .multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build(); .build();
} }