diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java index 1ddba13..5042287 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java @@ -7,10 +7,12 @@ import io.wdd.agent.executor.FunctionExecutor; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; +import java.util.List; import static io.wdd.agent.config.utils.NacosConfigurationCollector.ALL_FUNCTION_MAP; @@ -26,7 +28,6 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { @Resource ObjectMapper objectMapper; - @Override public boolean handle(OctopusMessage octopusMessage) { @@ -36,6 +37,8 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { return next.handle(octopusMessage); } + boolean isScheduledScript = false; + try { // 需要首先解析成 ExecutionMessage @@ -45,13 +48,16 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { } ); + // add in 2023-1-17 + if (CollectionUtils.isNotEmpty(executionMessage.getScriptCommandList())) { + // 传递的是 页面定时任务脚本 + isScheduledScript = true; + } + String executionType = executionMessage.getType(); - - if (ALL_FUNCTION_MAP.containsKey(executionType)) { // execute the exist function - functionExecutor.execute(executionMessage); - + functionExecutor.execute(executionMessage, isScheduledScript); } else { // handle command commandExecutor.execute(executionMessage); diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index f208d08..2244e79 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -37,7 +37,7 @@ public class CommandExecutor { /** * handle command from octopus server * - * @param executionMessage get from EXECUTOR_HANDLERju + * @param executionMessage get from EXECUTOR_HANDLER */ public void execute(ExecutionMessage executionMessage) { this.execute( diff --git a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java index fd51f9a..707a680 100644 --- a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java @@ -27,34 +27,39 @@ public class FunctionExecutor { @Resource NacosConfigurationCollector nacosConfigurationCollector; - public void execute(ExecutionMessage executionMessage) { - String resultKey = executionMessage.getResultKey(); + public void executeScriptScheduler(ExecutionMessage executionMessage) { - List> commandList = ALL_FUNCTION_MAP.get(executionMessage.getType()); - - this.execute(resultKey, commandList); - - /*Method execute = null; - - try { - execute = Class.forName(functionShellScriptFileName).getMethod("execute", String.class); - ReflectionUtils.invokeMethod(execute, functionShellScriptFileName, resultKey); - - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new MyRuntimeException(" Function Executor Reflection Error ! {} + {}", e.getCause(), e.getMessage()); - }*/ + executionMessage.getScriptCommandList(); } - private void execute(String streamKey, List> commandList) { + public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) { -// List> commandList = functionReader.ReadFileToCommandList(functionFileName); + // 拿到 resultKey + String resultKey = executionMessage.getResultKey(); - log.info("[ Function Executor ] all commands are ==> {}", commandList); + List> completeCommandList; + if (isScheduledScript) { + // 检测到是 页面定时任务脚本 + completeCommandList = executionMessage.getScriptCommandList(); + } else { + // 从 ALL_FUNCTION_MAP本地容器中(Nacos配置中获取)获取到 功能脚本的内容 + completeCommandList = ALL_FUNCTION_MAP.get(executionMessage.getType()); + } - Iterator> iterator = commandList.iterator(); + this.execute(resultKey, completeCommandList); + + } + + + private void execute(String streamKey, List> completeCommandList) { + + + log.info("[ Function Executor ] all commands are ==> {}", completeCommandList); + + Iterator> iterator = completeCommandList.iterator(); while (iterator.hasNext()) { int execute = commandExecutor.execute(streamKey, iterator.next()); diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index dfdac7a..23eaed8 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -50,7 +50,7 @@ public class TestCommandExecutorController { System.out.println("executionMessage = " + executionMessage); - functionExecutor.execute(executionMessage); + functionExecutor.execute(executionMessage, false); return R.ok(streamKey); } diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java index 2b949df..3f63fde 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java @@ -14,10 +14,20 @@ import java.util.List; @SuperBuilder(toBuilder = true) public class ExecutionMessage { + /** + * 用于区分 ExecutionMessage的类型 + * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot + */ private String type; private List commandList; + /** + * add in 2023-1-17 + * 页面定时脚本任务 需要传递完整的命令列表 + */ + private List> scriptCommandList; + private String resultKey; public static String GetResultKey(String topicName) { diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java index e964a56..c3bfbf0 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java @@ -5,22 +5,34 @@ import java.util.List; public interface CoreExecutionService { - String SendCommandToAgent(String agentTopicName, String command); String SendCommandToAgent(String agentTopicName, List commandList); - /** * @param agentTopicName agent唯一表示名 * @param type 任务执行类型 - * @param command 任务列表内容 + * @param commandList 任务列表内容 * @return redis中的 result key */ - String SendCommandToAgent(String agentTopicName, String type, List command); + String SendCommandToAgent(String agentTopicName, String type, List commandList); + + + String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + List SendCommandToAgent(List agentTopicNameList, String type, List command); + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 6b5a4ea..16b23ff 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -26,7 +26,7 @@ import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGEN public class CoreExecutionServiceImpl implements CoreExecutionService { @Resource - ToAgentMessageSender messageSender; + ToAgentMessageSender toAgentMessageSender; @Resource ObjectMapper objectMapper; @@ -38,21 +38,37 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { BuildStreamReader buildStreamReader; @Override - public String SendCommandToAgent(String agentagentTopicName, String command) { - return this.SendCommandToAgent(agentagentTopicName, - List.of(command)); + public String SendCommandToAgent(String agentTopicName, String command) { + return this.SendCommandToAgent( + agentTopicName, + List.of(command) + ); } @Override - public String SendCommandToAgent(String agentagentTopicName, List commandList) { - return this.SendCommandToAgent(agentagentTopicName, - "manual-command", - commandList); + public String SendCommandToAgent(String agentTopicName, List commandList) { + return this.SendCommandToAgent( + agentTopicName, + "manual-command", + commandList + ); } @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList) { + return SendCommandToAgent( + agentTopicName, + type, + commandList, + null + ); + + } + + @Override + public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error("agentTopicName异常!"); @@ -61,54 +77,51 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // 归一化type类型 不行 + // 生成 ResultKey + String resultKey = ExecutionMessage.GetResultKey(agentTopicName); + // 构造 Execution Command对应的消息体 - ExecutionMessage executionMessage = - ExecutionMessage.builder() - .type(type) - .commandList(commandList) - .resultKey(ExecutionMessage.GetResultKey(agentTopicName)) - .build(); - - - String executionMessageString; - try { - executionMessageString = objectMapper.writeValueAsString(executionMessage); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - OctopusMessage octopusMessage = OctopusMessage - .builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) - .content(executionMessageString) - .uuid(agentTopicName) - .build(); - - String resultKey = executionMessage.getResultKey(); + ExecutionMessage executionMessage = this + .generateExecutionMessage( + agentTopicName, + commandList, + resultKey, + commandListComplete + ); + OctopusMessage octopusMessage = this.generateOctopusMessage( + agentTopicName, + executionMessage + ); // send the message - messageSender.send(octopusMessage); + toAgentMessageSender.send(octopusMessage); // set up the stream read group String group = redisTemplate .opsForStream() - .createGroup(resultKey, - resultKey); - log.info("set consumer group [{}] for the stream key with => [ {} ]", - group, - resultKey); + .createGroup( + resultKey, + resultKey + ); + log.debug( + "set consumer group [{}] for the stream key with => [ {} ]", + group, + resultKey + ); // change the redis stream listener container // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // construct the persistent Bean - ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, - executionMessage); + ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( + octopusMessage, + executionMessage + ); // send resultKey to ExecutionResultDaemonHandler - WAIT_EXECUTION_RESULT_LIST.put(resultKey, - executionLog); + WAIT_EXECUTION_RESULT_LIST.put( + resultKey, + executionLog + ); // help gc executionMessage = null; @@ -117,6 +130,26 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { return resultKey; } + private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { + + try { + + return OctopusMessage + .builder() + .type(OctopusMessageType.EXECUTOR) + .init_time(LocalDateTime.now()) + .uuid(agentTopicName) + .content( + objectMapper.writeValueAsString(executionMessage) + ) + .build(); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { ExecutionLog executionLog = new ExecutionLog(); executionLog.setAgentTopicName(octopusMessage.getUuid()); @@ -130,29 +163,83 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public List SendCommandToAgent(List agentagentTopicNameList, String type, List command) { + return agentagentTopicNameList .stream() .map( - agentTopicName -> this.SendCommandToAgent - ( - agentTopicName, - type, - command) + agentTopicName -> this + .SendCommandToAgent + ( + agentTopicName, + type, + command + ) ) .collect(Collectors.toList()); } - @Deprecated - private OctopusMessage generateOctopusMessage(String agentTopicName, String type, List commandList) { + /** + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return + */ + @Override + public List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList + ) + ) + .collect(Collectors.toList()); - return null; } + @Deprecated - private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey) { + private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { - return null; + ExecutionMessage executionMessage = this.generateExecutionMessage( + type, + commandList, + resultKey, + commandListComplete + ); + + String executionMessageString; + + try { + executionMessageString = objectMapper.writeValueAsString(executionMessage); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return OctopusMessage + .builder() + .type(OctopusMessageType.EXECUTOR) + .init_time(LocalDateTime.now()) + .content(executionMessageString) + .uuid(agentTopicName) + .build(); + } + + private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete) { + + return ExecutionMessage + .builder() + .resultKey(resultKey) + .type(type) + .commandList(commandList) + .scriptCommandList(commandListComplete) + .build(); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java index 113d8fb..017a0bb 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java @@ -19,4 +19,6 @@ public class ScriptSchedulerDTO extends ScriptSchedulerVO{ List targetMachineList; + List resultKeyList; + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java index 8bdf171..e0c9a77 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java @@ -92,6 +92,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { */ private ScriptSchedulerPO convertToScriptSchedulerPO(ScriptSchedulerDTO dto) { // todo should be a static method + + return ScriptSchedulerPO .builder() .cronExpress(dto.getCronExpress()) @@ -288,7 +290,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { jobDetail, trigger ); - log.info( + + log.debug( "jobDataMap: {}", jobDetail.getJobDataMap() ); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 3fc8565..a82d70d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -3,7 +3,6 @@ package io.wdd.rpc.scheduler.service.script; import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; -import io.wdd.server.beans.po.ScriptSchedulerPO; import lombok.extern.log4j.Log4j; import org.springframework.stereotype.Service; @@ -22,16 +21,17 @@ public class AgentApplyScheduledScript { public void apply(ScriptSchedulerDTO scriptSchedulerDTO) { - List> commandList = scriptSchedulerDTO.getCommandList(); + List> completeCommandList = scriptSchedulerDTO.getCommandList(); List targetMachineList = scriptSchedulerDTO.getTargetMachineList(); - targetMachineList - .stream() - .map( - targetMachine -> { - coreExecutionService.SendCommandToAgent() - } - ) + List resultKeyList = coreExecutionService.SendCommandToAgentComplete( + targetMachineList, + "Scheduled Script", + completeCommandList + ); + + // 将 resultKeyList 放入这个DTO中 + scriptSchedulerDTO.setResultKeyList(resultKeyList); } diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index 656ffbb..29724a0 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -1,13 +1,74 @@ package io.wdd.server; +import io.wdd.rpc.execute.service.CoreExecutionService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + @SpringBootTest class ServerApplicationTests { + + @Resource + CoreExecutionService coreExecutionService; + @Test - void contextLoads() { + void testCoreExecutionCompleteScript() { + + ArrayList command1 = new ArrayList<>( + List.of( + "echo", + "yes" + ) + ); + + ArrayList command2 = new ArrayList<>( + List.of( + "apt-get", + "update" + ) + ); + + ArrayList command3 = new ArrayList<>( + List.of( + "echo", + "no" + ) + ); + + ArrayList command4 = new ArrayList<>( + List.of( + "apt-get", + "install", + "nginx", + "-y" + ) + ); + + List> completeScript = new ArrayList<>(); + completeScript.add(command1); + completeScript.add(command2); + completeScript.add(command3); + completeScript.add(command4); + + + ArrayList targetMachineList = new ArrayList<>( + List.of( + "Chengdu-amd64-98-98066f" + ) + ); + + List resultList = coreExecutionService.SendCommandToAgentComplete( + targetMachineList, + "Scheduled Script", + completeScript + ); + + + System.out.println("resultList = " + resultList); } }