[ server ] [ scheduler ]- script scheduler - 3

This commit is contained in:
zeaslity
2023-01-17 16:27:35 +08:00
parent 8ef3b271b1
commit e080d3f858
11 changed files with 280 additions and 94 deletions

View File

@@ -7,10 +7,12 @@ import io.wdd.agent.executor.FunctionExecutor;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import static io.wdd.agent.config.utils.NacosConfigurationCollector.ALL_FUNCTION_MAP;
@@ -26,7 +28,6 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
@Resource
ObjectMapper objectMapper;
@Override
public boolean handle(OctopusMessage octopusMessage) {
@@ -36,6 +37,8 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
return next.handle(octopusMessage);
}
boolean isScheduledScript = false;
try {
// 需要首先解析成 ExecutionMessage
@@ -45,13 +48,16 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
}
);
// add in 2023-1-17
if (CollectionUtils.isNotEmpty(executionMessage.getScriptCommandList())) {
// 传递的是 页面定时任务脚本
isScheduledScript = true;
}
String executionType = executionMessage.getType();
if (ALL_FUNCTION_MAP.containsKey(executionType)) {
// execute the exist function
functionExecutor.execute(executionMessage);
functionExecutor.execute(executionMessage, isScheduledScript);
} else {
// handle command
commandExecutor.execute(executionMessage);

View File

@@ -37,7 +37,7 @@ public class CommandExecutor {
/**
* handle command from octopus server
*
* @param executionMessage get from EXECUTOR_HANDLERju
* @param executionMessage get from EXECUTOR_HANDLER
*/
public void execute(ExecutionMessage executionMessage) {
this.execute(

View File

@@ -27,34 +27,39 @@ public class FunctionExecutor {
@Resource
NacosConfigurationCollector nacosConfigurationCollector;
public void execute(ExecutionMessage executionMessage) {
String resultKey = executionMessage.getResultKey();
public void executeScriptScheduler(ExecutionMessage executionMessage) {
List<List<String>> commandList = ALL_FUNCTION_MAP.get(executionMessage.getType());
this.execute(resultKey, commandList);
/*Method execute = null;
try {
execute = Class.forName(functionShellScriptFileName).getMethod("execute", String.class);
ReflectionUtils.invokeMethod(execute, functionShellScriptFileName, resultKey);
} catch (NoSuchMethodException | ClassNotFoundException e) {
throw new MyRuntimeException(" Function Executor Reflection Error ! {} + {}", e.getCause(), e.getMessage());
}*/
executionMessage.getScriptCommandList();
}
private void execute(String streamKey, List<List<String>> commandList) {
public void execute(ExecutionMessage executionMessage, boolean isScheduledScript) {
// List<List<String>> commandList = functionReader.ReadFileToCommandList(functionFileName);
// 拿到 resultKey
String resultKey = executionMessage.getResultKey();
log.info("[ Function Executor ] all commands are ==> {}", commandList);
List<List<String>> completeCommandList;
if (isScheduledScript) {
// 检测到是 页面定时任务脚本
completeCommandList = executionMessage.getScriptCommandList();
} else {
// 从 ALL_FUNCTION_MAP本地容器中(Nacos配置中获取)获取到 功能脚本的内容
completeCommandList = ALL_FUNCTION_MAP.get(executionMessage.getType());
}
Iterator<List<String>> iterator = commandList.iterator();
this.execute(resultKey, completeCommandList);
}
private void execute(String streamKey, List<List<String>> completeCommandList) {
log.info("[ Function Executor ] all commands are ==> {}", completeCommandList);
Iterator<List<String>> iterator = completeCommandList.iterator();
while (iterator.hasNext()) {
int execute = commandExecutor.execute(streamKey, iterator.next());

View File

@@ -50,7 +50,7 @@ public class TestCommandExecutorController {
System.out.println("executionMessage = " + executionMessage);
functionExecutor.execute(executionMessage);
functionExecutor.execute(executionMessage, false);
return R.ok(streamKey);
}

View File

@@ -14,10 +14,20 @@ import java.util.List;
@SuperBuilder(toBuilder = true)
public class ExecutionMessage {
/**
* 用于区分 ExecutionMessage的类型
* 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot
*/
private String type;
private List<String> commandList;
/**
* add in 2023-1-17
* 页面定时脚本任务 需要传递完整的命令列表
*/
private List<List<String>> scriptCommandList;
private String resultKey;
public static String GetResultKey(String topicName) {

View File

@@ -5,22 +5,34 @@ import java.util.List;
public interface CoreExecutionService {
String SendCommandToAgent(String agentTopicName, String command);
String SendCommandToAgent(String agentTopicName, List<String> commandList);
/**
* @param agentTopicName agent唯一表示名
* @param type 任务执行类型
* @param command 任务列表内容
* @param commandList 任务列表内容
* @return redis中的 result key
*/
String SendCommandToAgent(String agentTopicName, String type, List<String> command);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
}

View File

@@ -26,7 +26,7 @@ import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGEN
public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource
ToAgentMessageSender messageSender;
ToAgentMessageSender toAgentMessageSender;
@Resource
ObjectMapper objectMapper;
@@ -38,21 +38,37 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
BuildStreamReader buildStreamReader;
@Override
public String SendCommandToAgent(String agentagentTopicName, String command) {
return this.SendCommandToAgent(agentagentTopicName,
List.of(command));
public String SendCommandToAgent(String agentTopicName, String command) {
return this.SendCommandToAgent(
agentTopicName,
List.of(command)
);
}
@Override
public String SendCommandToAgent(String agentagentTopicName, List<String> commandList) {
return this.SendCommandToAgent(agentagentTopicName,
public String SendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SendCommandToAgent(
agentTopicName,
"manual-command",
commandList);
commandList
);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return SendCommandToAgent(
agentTopicName,
type,
commandList,
null
);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error("agentTopicName异常!");
@@ -61,54 +77,51 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// 归一化type类型 不行
// 生成 ResultKey
String resultKey = ExecutionMessage.GetResultKey(agentTopicName);
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage =
ExecutionMessage.builder()
.type(type)
.commandList(commandList)
.resultKey(ExecutionMessage.GetResultKey(agentTopicName))
.build();
String executionMessageString;
try {
executionMessageString = objectMapper.writeValueAsString(executionMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
OctopusMessage octopusMessage = OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(agentTopicName)
.build();
String resultKey = executionMessage.getResultKey();
ExecutionMessage executionMessage = this
.generateExecutionMessage(
agentTopicName,
commandList,
resultKey,
commandListComplete
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
// send the message
messageSender.send(octopusMessage);
toAgentMessageSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(resultKey,
resultKey);
log.info("set consumer group [{}] for the stream key with => [ {} ]",
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey);
resultKey
);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean
ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage,
executionMessage);
ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(
octopusMessage,
executionMessage
);
// send resultKey to ExecutionResultDaemonHandler
WAIT_EXECUTION_RESULT_LIST.put(resultKey,
executionLog);
WAIT_EXECUTION_RESULT_LIST.put(
resultKey,
executionLog
);
// help gc
executionMessage = null;
@@ -117,6 +130,26 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
return resultKey;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
try {
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.uuid(agentTopicName)
.content(
objectMapper.writeValueAsString(executionMessage)
)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid());
@@ -130,29 +163,83 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override
public List<String> SendCommandToAgent(List<String> agentagentTopicNameList, String type, List<String> command) {
return agentagentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgent
agentTopicName -> this
.SendCommandToAgent
(
agentTopicName,
type,
command)
command
)
)
.collect(Collectors.toList());
}
@Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String type, List<String> commandList) {
/**
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return
*/
@Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList
)
)
.collect(Collectors.toList());
return null;
}
@Deprecated
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey) {
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {
return null;
ExecutionMessage executionMessage = this.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete
);
String executionMessageString;
try {
executionMessageString = objectMapper.writeValueAsString(executionMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(agentTopicName)
.build();
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.commandList(commandList)
.scriptCommandList(commandListComplete)
.build();
}

View File

@@ -19,4 +19,6 @@ public class ScriptSchedulerDTO extends ScriptSchedulerVO{
List<String> targetMachineList;
List<String> resultKeyList;
}

View File

@@ -92,6 +92,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
*/
private ScriptSchedulerPO convertToScriptSchedulerPO(ScriptSchedulerDTO dto) {
// todo should be a static method
return ScriptSchedulerPO
.builder()
.cronExpress(dto.getCronExpress())
@@ -288,7 +290,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
jobDetail,
trigger
);
log.info(
log.debug(
"jobDataMap: {}",
jobDetail.getJobDataMap()
);

View File

@@ -3,7 +3,6 @@ package io.wdd.rpc.scheduler.service.script;
import io.wdd.rpc.execute.service.CoreExecutionService;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import io.wdd.server.beans.po.ScriptSchedulerPO;
import lombok.extern.log4j.Log4j;
import org.springframework.stereotype.Service;
@@ -22,16 +21,17 @@ public class AgentApplyScheduledScript {
public void apply(ScriptSchedulerDTO scriptSchedulerDTO) {
List<List<String>> commandList = scriptSchedulerDTO.getCommandList();
List<List<String>> completeCommandList = scriptSchedulerDTO.getCommandList();
List<String> targetMachineList = scriptSchedulerDTO.getTargetMachineList();
targetMachineList
.stream()
.map(
targetMachine -> {
coreExecutionService.SendCommandToAgent()
}
)
List<String> resultKeyList = coreExecutionService.SendCommandToAgentComplete(
targetMachineList,
"Scheduled Script",
completeCommandList
);
// 将 resultKeyList 放入这个DTO中
scriptSchedulerDTO.setResultKeyList(resultKeyList);
}

View File

@@ -1,13 +1,74 @@
package io.wdd.server;
import io.wdd.rpc.execute.service.CoreExecutionService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@SpringBootTest
class ServerApplicationTests {
@Resource
CoreExecutionService coreExecutionService;
@Test
void contextLoads() {
void testCoreExecutionCompleteScript() {
ArrayList<String> command1 = new ArrayList<>(
List.of(
"echo",
"yes"
)
);
ArrayList<String> command2 = new ArrayList<>(
List.of(
"apt-get",
"update"
)
);
ArrayList<String> command3 = new ArrayList<>(
List.of(
"echo",
"no"
)
);
ArrayList<String> command4 = new ArrayList<>(
List.of(
"apt-get",
"install",
"nginx",
"-y"
)
);
List<List<String>> completeScript = new ArrayList<>();
completeScript.add(command1);
completeScript.add(command2);
completeScript.add(command3);
completeScript.add(command4);
ArrayList<String> targetMachineList = new ArrayList<>(
List.of(
"Chengdu-amd64-98-98066f"
)
);
List<String> resultList = coreExecutionService.SendCommandToAgentComplete(
targetMachineList,
"Scheduled Script",
completeScript
);
System.out.println("resultList = " + resultList);
}
}