diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java index 3f63fde..3459ce4 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java @@ -6,6 +6,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import java.time.LocalDateTime; import java.util.List; @Data @@ -16,7 +17,7 @@ public class ExecutionMessage { /** * 用于区分 ExecutionMessage的类型 - * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot + * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot */ private String type; @@ -24,15 +25,18 @@ public class ExecutionMessage { /** * add in 2023-1-17 - * 页面定时脚本任务 需要传递完整的命令列表 + * 页面定时脚本任务 需要传递完整的命令列表 */ private List> scriptCommandList; private String resultKey; public static String GetResultKey(String topicName) { - return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit(); } + public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) { + return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime); + } + } diff --git a/pom.xml b/pom.xml index 981c465..4fc0dc5 100644 --- a/pom.xml +++ b/pom.xml @@ -132,12 +132,12 @@ true - + org.springframework.boot diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java index c3bfbf0..2dfe87f 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java @@ -1,5 +1,6 @@ package io.wdd.rpc.execute.service; +import java.util.HashMap; import java.util.List; @@ -20,6 +21,8 @@ public interface CoreExecutionService { String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + List SendCommandToAgent(List agentTopicNameList, String type, List command); @@ -35,4 +38,16 @@ public interface CoreExecutionService { List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 16b23ff..8a0b7c8 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -10,11 +10,13 @@ import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.message.sender.ToAgentMessageSender; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -69,6 +71,12 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + return this.SendCommandToAgent(agentTopicName, type, commandList, commandListComplete, null); + } + + @Override + public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error("agentTopicName异常!"); @@ -77,8 +85,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // 归一化type类型 不行 - // 生成 ResultKey - String resultKey = ExecutionMessage.GetResultKey(agentTopicName); + String resultKey = futureKey; + // 判定是否是 FutureKey + if (StringUtils.isEmpty(agentTopicName)) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } // 构造 Execution Command对应的消息体 ExecutionMessage executionMessage = this @@ -201,6 +212,23 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { } + @Override + public List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + atnFutureKey.get(agentTopicName) + ) + ) + .collect(Collectors.toList()); + } + @Deprecated private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java index 1bf5483..7731996 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/ScriptSchedulerDTO.java @@ -1,11 +1,13 @@ package io.wdd.rpc.scheduler.beans; import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import java.util.HashMap; import java.util.List; @ApiModel("定时脚本任务-中间转换状态-实体类") @@ -15,10 +17,25 @@ import java.util.List; @SuperBuilder(toBuilder = true) public class ScriptSchedulerDTO extends ScriptSchedulerVO{ + /** + * 脚本解析之后的完整命令行 + */ + @ApiModelProperty(" 脚本解析之后的完整命令行") List> completeCommandList; + /** + * 存储内容为,目标执行机器agentTopicName列表 + */ + @ApiModelProperty("目标执行机器agentTopicName列表") List targetMachineList; + @Deprecated List resultKeyList; + /** + * 存储内容为 agentTopicName -- FutureResultKey + */ + @ApiModelProperty("agentTopicName -- FutureResultKey") + HashMap agentTopicNameToFutureResultKeyMap; + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java index 24e193c..19d14c7 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java @@ -1,5 +1,6 @@ package io.wdd.rpc.scheduler.service; +import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.handler.MyRuntimeException; import io.wdd.common.utils.FunctionReader; import io.wdd.common.utils.TimeUtils; @@ -25,7 +26,6 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.rpc.scheduler.beans.ScriptSchedulerVO.SCHEDULE_MISSION_GROUP_NAME; @@ -73,26 +73,93 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { AgentScriptSchedulerJob.class, scriptSchedulerDTO.getSchedulerUuid(), SCHEDULE_MISSION_GROUP_NAME, - 1, // 立即开始本次任务 + 0, + // 立即开始本次任务 1ms wait scriptSchedulerDTO.getCronExpress(), dataMap ); - try { - TimeUnit.SECONDS.sleep(2); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + HashMap futureExecutionResultKey = getFutureExecutionResultKey(scriptSchedulerDTO); + + log.info("futureExecutionResultKey is => {}", futureExecutionResultKey); + // persistent the script scheduled mission // dto should store more info ScriptSchedulerPO scriptSchedulerPO = convertToScriptSchedulerPO(scriptSchedulerDTO); - log.info(String.valueOf(scriptSchedulerPO)); + log.info("scriptSchedulerPO is => {}", scriptSchedulerPO); // scriptSchedulerService.save(scriptSchedulerPO); return scriptSchedulerDTO.getResultKeyList(); } + + /** + * @param scriptSchedulerDTO dto + * @return dto中存储的 未来ExecutionKey对象 + */ + private HashMap getFutureExecutionResultKey(ScriptSchedulerDTO scriptSchedulerDTO) { + + ArrayList time = getLastNextExecutionTime( + scriptSchedulerDTO.getSchedulerUuid(), + SCHEDULE_MISSION_GROUP_NAME + ); + + LocalDateTime nextExecutionTime = time.get(1); + + HashMap keyMap = scriptSchedulerDTO.getAgentTopicNameToFutureResultKeyMap(); + + // 为每一个目标Agent都要设置相应的 FutureKey + scriptSchedulerDTO + .getTargetMachineList() + .stream() + .forEach( + targetMachine -> { + keyMap.put( + targetMachine, + ExecutionMessage.GetFutureResultKey( + targetMachine, + nextExecutionTime + ) + ); + } + ); + + return keyMap; + } + + /** + * 定时任务通常在将来执行,所以需要手动获取到下次的执行时间 + * + * @param missionName 任务名称 + * @param missionGroupName 任务Group组名 + * @return 上次调度时间 下次执行时间 + */ + public ArrayList getLastNextExecutionTime(String missionName, String missionGroupName) { + // 获取JobDetail存储上次调度时间和下次执行时间 + try { + Trigger schedulerTrigger = scheduler.getTrigger( + new TriggerKey( + missionName, + missionGroupName + ) + ); + + ArrayList result = new ArrayList<>(); + + LocalDateTime last_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getFinalFireTime()); + LocalDateTime next_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getNextFireTime()); + + result.add(last_schedule_time); + result.add(next_schedule_time); + + return result; + + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + } + /** * 转换 中间层 --> 持久层 * @@ -109,26 +176,6 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { .targetMachine(String.valueOf(dto.getTargetMachineList())) .build(); - // 获取JobDetail存储上次调度时间和下次执行时间 - try { - Trigger schedulerTrigger = scheduler.getTrigger( - new TriggerKey( - dto.getSchedulerUuid(), - SCHEDULE_MISSION_GROUP_NAME - ) - ); - - - LocalDateTime last_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getFinalFireTime()); - LocalDateTime next_schedule_time = TimeUtils.cvFromDate(schedulerTrigger.getNextFireTime()); - - schedulerPO.setLastScheduleTime(last_schedule_time); - schedulerPO.setNextScheduleTime(next_schedule_time); - - } catch (SchedulerException e) { - throw new RuntimeException(e); - } - return schedulerPO; } @@ -161,19 +208,28 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { throw new MyRuntimeException("target machine wrong !"); } ArrayList targetMachineList = new ArrayList<>(); - for (String s : targetMachineSplit) { - targetMachineList.add(s); - } + Collections.addAll( + targetMachineList, + targetMachineSplit + ); // 生成DTO对象 ScriptSchedulerDTO dto = new ScriptSchedulerDTO(); - BeanUtils.copyProperties(dto, - scriptSchedulerVO); + BeanUtils.copyProperties( + dto, + scriptSchedulerVO + ); + + // 初始化属性值 + dto.setAgentTopicNameToFutureResultKeyMap( + new HashMap(16) + ); // 设置属性值 dto.setCompleteCommandList(completeCommandList); dto.setTargetMachineList(targetMachineList); + // 生成 scheduler uuid String uuid = RandomStringUtils.randomAlphabetic(32); dto.setSchedulerUuid(uuid); @@ -267,7 +323,8 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { * @param jobClass 任务实现类 * @param jobName 任务名称(建议唯一) * @param jobGroupName 任务组名 - * @param startTime + * @param startTime 单位为ms,传入 0 + * 表示立即开始 * @param cronJobExpression 时间表达式 (如:0/5 * * * * ? ) * @param jobData 参数 */ @@ -308,7 +365,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { .startAt( DateBuilder.futureDate( startTime, - IntervalUnit.SECOND + IntervalUnit.MILLISECOND ) ) .withSchedule( @@ -328,6 +385,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { jobDetail.getJobDataMap() ); + } catch (Exception e) { e.printStackTrace(); throw new MyRuntimeException("add job error!"); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index a2d5d45..0985f5f 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -33,6 +33,10 @@ public class AgentApplyScheduledScript { // 将 resultKeyList 放入这个DTO中 scriptSchedulerDTO.setResultKeyList(resultKeyList); + // 需要更新数据库 + // 关联性数据库 + + } diff --git a/server/src/main/resources/bootstrap.yml b/server/src/main/resources/bootstrap.yml index 77c7df1..c6cd9da 100644 --- a/server/src/main/resources/bootstrap.yml +++ b/server/src/main/resources/bootstrap.yml @@ -18,9 +18,3 @@ spring: extension-configs: - group: local data-id: common-local.yaml - -debug: true -logging: - level: - io.wdd.server: - debug \ No newline at end of file