[ server ] [ scheduler ]- script scheduler optimize proceed

This commit is contained in:
zeaslity
2023-01-18 17:23:23 +08:00
parent f6045783a2
commit ac27f1c702
8 changed files with 168 additions and 48 deletions

View File

@@ -6,6 +6,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
import java.util.List;
@Data
@@ -16,7 +17,7 @@ public class ExecutionMessage {
/**
* 用于区分 ExecutionMessage的类型
* 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot
* 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot
*/
private String type;
@@ -24,15 +25,18 @@ public class ExecutionMessage {
/**
* add in 2023-1-17
* 页面定时脚本任务 需要传递完整的命令列表
* 页面定时脚本任务 需要传递完整的命令列表
*/
private List<List<String>> scriptCommandList;
private String resultKey;
public static String GetResultKey(String topicName) {
return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit();
}
public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) {
return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime);
}
}

View File

@@ -132,12 +132,12 @@
<optional>true</optional>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -1,5 +1,6 @@
package io.wdd.rpc.execute.service;
import java.util.HashMap;
import java.util.List;
@@ -20,6 +21,8 @@ public interface CoreExecutionService {
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
@@ -35,4 +38,16 @@ public interface CoreExecutionService {
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
}

View File

@@ -10,11 +10,13 @@ import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
@@ -69,6 +71,12 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SendCommandToAgent(agentTopicName, type, commandList, commandListComplete, null);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error("agentTopicName异常!");
@@ -77,8 +85,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// 归一化type类型 不行
// 生成 ResultKey
String resultKey = ExecutionMessage.GetResultKey(agentTopicName);
String resultKey = futureKey;
// 判定是否是 FutureKey
if (StringUtils.isEmpty(agentTopicName)) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
@@ -201,6 +212,23 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
}
@Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
atnFutureKey.get(agentTopicName)
)
)
.collect(Collectors.toList());
}
@Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {

View File

@@ -1,11 +1,13 @@
package io.wdd.rpc.scheduler.beans;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.HashMap;
import java.util.List;
@ApiModel("定时脚本任务-中间转换状态-实体类")
@@ -15,10 +17,25 @@ import java.util.List;
@SuperBuilder(toBuilder = true)
public class ScriptSchedulerDTO extends ScriptSchedulerVO{
/**
* 脚本解析之后的完整命令行
*/
@ApiModelProperty(" 脚本解析之后的完整命令行")
List<List<String>> completeCommandList;
/**
* 存储内容为,目标执行机器agentTopicName列表
*/
@ApiModelProperty("目标执行机器agentTopicName列表")
List<String> targetMachineList;
@Deprecated
List<String> resultKeyList;
/**
* 存储内容为 agentTopicName -- FutureResultKey
*/
@ApiModelProperty("agentTopicName -- FutureResultKey")
HashMap<String, String> agentTopicNameToFutureResultKeyMap;
}

View File

@@ -1,5 +1,6 @@
package io.wdd.rpc.scheduler.service;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.common.utils.FunctionReader;
import io.wdd.common.utils.TimeUtils;
@@ -25,7 +26,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.scheduler.beans.ScriptSchedulerVO.SCHEDULE_MISSION_GROUP_NAME;
@@ -73,26 +73,93 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
AgentScriptSchedulerJob.class,
scriptSchedulerDTO.getSchedulerUuid(),
SCHEDULE_MISSION_GROUP_NAME,
1, // 立即开始本次任务
0,
// 立即开始本次任务 1ms wait
scriptSchedulerDTO.getCronExpress(),
dataMap
);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
HashMap<String, String> futureExecutionResultKey = getFutureExecutionResultKey(scriptSchedulerDTO);
log.info("futureExecutionResultKey is => {}", futureExecutionResultKey);
// persistent the script scheduled mission
// dto should store more info
ScriptSchedulerPO scriptSchedulerPO = convertToScriptSchedulerPO(scriptSchedulerDTO);
log.info(String.valueOf(scriptSchedulerPO));
log.info("scriptSchedulerPO is => {}", scriptSchedulerPO);
// scriptSchedulerService.save(scriptSchedulerPO);
return scriptSchedulerDTO.getResultKeyList();
}
/**
* @param scriptSchedulerDTO dto
* @return dto中存储的 未来ExecutionKey对象
*/
private HashMap<String, String> getFutureExecutionResultKey(ScriptSchedulerDTO scriptSchedulerDTO) {
ArrayList<LocalDateTime> time = getLastNextExecutionTime(
scriptSchedulerDTO.getSchedulerUuid(),
SCHEDULE_MISSION_GROUP_NAME
);
LocalDateTime nextExecutionTime = time.get(1);
HashMap<String, String> keyMap = scriptSchedulerDTO.getAgentTopicNameToFutureResultKeyMap();
// 为每一个目标Agent都要设置相应的 FutureKey
scriptSchedulerDTO
.getTargetMachineList()
.stream()
.forEach(
targetMachine -> {
keyMap.put(
targetMachine,
ExecutionMessage.GetFutureResultKey(
targetMachine,
nextExecutionTime
)
);
}
);
return keyMap;
}
/**
* 定时任务通常在将来执行,所以需要手动获取到下次的执行时间
*
* @param missionName 任务名称
* @param missionGroupName 任务Group组名
* @return 上次调度时间 下次执行时间
*/
public ArrayList<LocalDateTime> getLastNextExecutionTime(String missionName, String missionGroupName) {
// 获取JobDetail存储上次调度时间和下次执行时间
try {
Trigger schedulerTrigger = scheduler.getTrigger(
new TriggerKey(
missionName,
missionGroupName
)
);
ArrayList<LocalDateTime> result = new ArrayList<>();
LocalDateTime last_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getFinalFireTime());
LocalDateTime next_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getNextFireTime());
result.add(last_schedule_time);
result.add(next_schedule_time);
return result;
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 转换 中间层 --> 持久层
*
@@ -109,26 +176,6 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
.targetMachine(String.valueOf(dto.getTargetMachineList()))
.build();
// 获取JobDetail存储上次调度时间和下次执行时间
try {
Trigger schedulerTrigger = scheduler.getTrigger(
new TriggerKey(
dto.getSchedulerUuid(),
SCHEDULE_MISSION_GROUP_NAME
)
);
LocalDateTime last_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getFinalFireTime());
LocalDateTime next_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getNextFireTime());
schedulerPO.setLastScheduleTime(last_schedule_time);
schedulerPO.setNextScheduleTime(next_schedule_time);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
return schedulerPO;
}
@@ -161,19 +208,28 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
throw new MyRuntimeException("target machine wrong !");
}
ArrayList<String> targetMachineList = new ArrayList<>();
for (String s : targetMachineSplit) {
targetMachineList.add(s);
}
Collections.addAll(
targetMachineList,
targetMachineSplit
);
// 生成DTO对象
ScriptSchedulerDTO dto = new ScriptSchedulerDTO();
BeanUtils.copyProperties(dto,
scriptSchedulerVO);
BeanUtils.copyProperties(
dto,
scriptSchedulerVO
);
// 初始化属性值
dto.setAgentTopicNameToFutureResultKeyMap(
new HashMap<String, String>(16)
);
// 设置属性值
dto.setCompleteCommandList(completeCommandList);
dto.setTargetMachineList(targetMachineList);
// 生成 scheduler uuid
String uuid = RandomStringUtils.randomAlphabetic(32);
dto.setSchedulerUuid(uuid);
@@ -267,7 +323,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
* @param jobClass 任务实现类
* @param jobName 任务名称(建议唯一)
* @param jobGroupName 任务组名
* @param startTime
* @param startTime 单位为ms传入 0
* 表示立即开始
* @param cronJobExpression 时间表达式 0/5 * * * * ?
* @param jobData 参数
*/
@@ -308,7 +365,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
.startAt(
DateBuilder.futureDate(
startTime,
IntervalUnit.SECOND
IntervalUnit.MILLISECOND
)
)
.withSchedule(
@@ -328,6 +385,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
jobDetail.getJobDataMap()
);
} catch (Exception e) {
e.printStackTrace();
throw new MyRuntimeException("add job error!");

View File

@@ -33,6 +33,10 @@ public class AgentApplyScheduledScript {
// 将 resultKeyList 放入这个DTO中
scriptSchedulerDTO.setResultKeyList(resultKeyList);
// 需要更新数据库
// 关联性数据库
}

View File

@@ -18,9 +18,3 @@ spring:
extension-configs:
- group: local
data-id: common-local.yaml
debug: true
logging:
level:
io.wdd.server:
debug