From 41396e024c83ed6f6a6b4a6c2e6a8c82f2188296 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Mon, 16 Jan 2023 17:11:43 +0800 Subject: [PATCH] [ server ] [ execution ]- optimize some controller - 1 --- .../beans/status/AgentHealthyStatusEnum.java | 34 +++ .../rpc/controller/ExecutionController.java | 67 +++++- .../rpc/controller/QuartzCRUDController.java | 75 ------- .../rpc/controller/SchedulerController.java | 79 +++++++ .../wdd/rpc/controller/StatusController.java | 36 +++ .../execute/service/CoreExecutionService.java | 14 +- .../service/CoreExecutionServiceImpl.java | 50 +++-- ...tusJob.java => AgentStatusMonitorJob.java} | 2 +- .../service/BuildStatusScheduleTask.java | 6 +- ...rvice.java => QuartzSchedulerService.java} | 12 +- ...l.java => QuartzSchedulerServiceImpl.java} | 205 +++++++++++++----- .../wdd/rpc/status/MonitorAllAgentStatus.java | 143 ++++++++---- ...nMapper.xml => QuartzSchedulerService.xml} | 0 13 files changed, 523 insertions(+), 200 deletions(-) create mode 100644 common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java delete mode 100644 server/src/main/java/io/wdd/rpc/controller/QuartzCRUDController.java create mode 100644 server/src/main/java/io/wdd/rpc/controller/SchedulerController.java create mode 100644 server/src/main/java/io/wdd/rpc/controller/StatusController.java rename server/src/main/java/io/wdd/rpc/scheduler/job/{MonitorAllAgentStatusJob.java => AgentStatusMonitorJob.java} (93%) rename server/src/main/java/io/wdd/rpc/scheduler/service/{OctopusQuartzService.java => QuartzSchedulerService.java} (92%) rename server/src/main/java/io/wdd/rpc/scheduler/service/{OctopusQuartzServiceImpl.java => QuartzSchedulerServiceImpl.java} (60%) rename server/src/main/resources/mapper/{ServerAppRelationMapper.xml => QuartzSchedulerService.xml} (100%) diff --git a/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java b/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java new file mode 100644 index 0000000..2a1cd98 --- /dev/null +++ b/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java @@ -0,0 +1,34 @@ +package io.wdd.common.beans.status; + +import lombok.Getter; +import lombok.Setter; + +/** + * AgentHealthy状态描述实体类 + * Agent存货状态描述 + */ +@Getter +public enum AgentHealthyStatusEnum { + + FAILED("FAILED", "Agent存活状态为 失败"), + + HEALTHY("HEALTHY", "Agent存活状态为 存活"), + + UNKNOWN("UNKNOWN", "Agent存活状态 未知"); + + String status; + + + String description; + + + + AgentHealthyStatusEnum(String status, String description) { + this.description = description; + this.status = status; + } + + + + +} diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index bba4b7d..ad59f90 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -1,5 +1,7 @@ package io.wdd.rpc.controller; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; import io.wdd.common.beans.response.R; import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.service.CoreExecutionService; @@ -17,7 +19,8 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_RED import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; @RestController -@RequestMapping("octopus/server/executor") +@RequestMapping("/octopus/server/executor") +@Api("Agent执行命令的Controller") public class ExecutionController { @Resource @@ -37,9 +40,16 @@ public class ExecutionController { String streamKey = ""; if (StringUtils.isEmpty(type)) { - streamKey = coreExecutionService.SendCommandToAgent(topicName, commandList); + streamKey = coreExecutionService.SendCommandToAgent( + topicName, + commandList + ); } else { - streamKey = coreExecutionService.SendCommandToAgent(topicName, type, commandList); + streamKey = coreExecutionService.SendCommandToAgent( + topicName, + type, + commandList + ); } return R.ok(streamKey); @@ -50,7 +60,10 @@ public class ExecutionController { @RequestParam(value = "streamKey") String streamKey ) { - buildStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER , streamKey); + buildStreamReader.registerStreamReader( + COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, + streamKey + ); } @@ -59,18 +72,58 @@ public class ExecutionController { @RequestParam(value = "streamKey") String streamKey ) { - buildStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER , streamKey); + buildStreamReader.registerStreamReader( + AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER, + streamKey + ); } - @PostMapping("/agentUpdate") - public void AgentUpdate( + // auth required + @PostMapping("/AgentUpdate") + @ApiOperation("控制Agent升级的接口") + public R AgentUpdate( @RequestParam(value = "agentTopicName") String agentTopicName ) { + return R.ok( + coreExecutionService + .SendCommandToAgent( + agentTopicName, + "AgentUpdate", + null + )); + } + @PostMapping("/AgentReboot") + @ApiOperation("控制Agent重启的接口") + public R AgentReboot( + @RequestParam(value = "agentTopicName") String agentTopicName + ) { + return R.ok( + coreExecutionService + .SendCommandToAgent( + agentTopicName, + "AgentRestart", + null + )); + } + + @PostMapping("/AgentShutdown") + @ApiOperation("控制Agent关闭的接口") + public R AgentShutdown( + @RequestParam(value = "agentTopicName") String agentTopicName + ) { + + return R.ok( + coreExecutionService + .SendCommandToAgent( + agentTopicName, + "AgentShutdown", + null + )); } diff --git a/server/src/main/java/io/wdd/rpc/controller/QuartzCRUDController.java b/server/src/main/java/io/wdd/rpc/controller/QuartzCRUDController.java deleted file mode 100644 index f524175..0000000 --- a/server/src/main/java/io/wdd/rpc/controller/QuartzCRUDController.java +++ /dev/null @@ -1,75 +0,0 @@ -package io.wdd.rpc.controller; - - -import com.fasterxml.jackson.annotation.JsonInclude; -import io.swagger.annotations.*; -import io.wdd.rpc.scheduler.config.UpdateJobBean; -import io.wdd.rpc.scheduler.service.OctopusQuartzService; -import org.quartz.JobDataMap; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.*; - -import java.util.List; -import java.util.Map; - -@RestController -@Api(value = "quartz增删改查相关API") -@RequestMapping(value = "/quartz") -public class QuartzCRUDController { - - @Autowired - OctopusQuartzService octopusQuartzService; - -// @ApiOperation(value = "使用quartz添加job") -// @RequestMapping(value = "/addJob/{jobUUID}", method = RequestMethod.POST) -// public void addQuartzJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID, @ApiParam(name = "JobXXXBean") @RequestBody JobXXXBean jobXXXBean) { -// -// if (jobXXXBean.getOpenBean() != null) { -// JobDataMap jobDataMap = new JobDataMap(); -// jobDataMap.put("key01", jobXXXBean.getKey01()); -// jobDataMap.put("key02", jobXXXBean.getKey02()); -// jobDataMap.put("key03", jobXXXBean.getKey03()); -// jobDataMap.put("jobTimeCron", jobXXXBean.getJobTimeCron()); -// jobDataMap.put("key04", jobXXXBean.getKey04()); -// octopusQuartzService.addJob(Job1.class, jobUUID, jobUUID, jobXXXBean.getJobTimeCron(), jobDataMap); -// } else { -// throw new BadRequestException("参数错误"); -// } -// } - - - @ApiOperation(value = "使用quartz查询所有job") - @RequestMapping(value = "/queryAllJob", method = RequestMethod.GET) - public List> queryAllQuartzJob() { - - List> list = octopusQuartzService.queryAllJob(); - return list; - } - - - @ApiOperation(value = "使用quartz查询所有运行job") - @RequestMapping(value = "/queryRunJob", method = RequestMethod.GET) - public List> queryRunQuartzJob() { - - List> list = octopusQuartzService.queryRunJob(); - return list; - } - - @ApiOperation(value = "使用quartz删除job") - @RequestMapping(value = "/deleteJob/{jobUUID}", method = RequestMethod.DELETE) - public void deleteJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID) { - - octopusQuartzService.deleteJob(jobUUID, jobUUID); - } - - - @ApiOperation(value = "使用quartz修改job的cron时间") - @RequestMapping(value = "/updateJob/{jobUUID}", method = RequestMethod.PUT) - public void deleteJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean) { - - octopusQuartzService.updateJob(jobUUID, jobUUID, updateJobBean.getJobCronTime()); - - } -} - - diff --git a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java new file mode 100644 index 0000000..7552239 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java @@ -0,0 +1,79 @@ +package io.wdd.rpc.controller; + + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.wdd.common.beans.response.R; +import io.wdd.rpc.scheduler.config.UpdateJobBean; +import io.wdd.rpc.scheduler.service.QuartzSchedulerService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; + +@RestController +@Api(value = "定时任务控制中心的Controller") +@RequestMapping(value = "/octopus/server/scheduler") +public class SchedulerController { + + @Autowired + QuartzSchedulerService octopusQuartzService; + + + @ApiOperation(value = "查询所有job") + @GetMapping(value = "/queryAllJob") + public R>> queryAllQuartzJob() { + + return R.ok(octopusQuartzService.queryAllJob()); + } + + + @ApiOperation(value = "查询所有运行job") + @PostMapping(value = "/queryRunJob") + public R>> queryRunQuartzJob() { + + return R.ok(octopusQuartzService.queryRunJob()); + } + + @ApiOperation(value = "删除一个job") + @PostMapping(value = "/deleteJob/{jobName}") + public R deleteJob( + @ApiParam(name = "jobName") @RequestParam("jobName") String jobName + ) { + + boolean deleteJob = octopusQuartzService.deleteJob( + jobName, + jobName + ); + String result = String.format( + "删除任务[ %s ]结果为 [ %s ]", + jobName, + deleteJob + ); + + if (deleteJob) { + return R.ok(result); + } else { + return R.failed(result); + } + } + + + @ApiOperation(value = "修改job的cron时间") + @PostMapping(value = "/updateJob/{jobName}") + public void deleteJob( + @ApiParam(name = "jobName") @PathVariable("jobName") String jobName, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean + ) { + + octopusQuartzService.updateJob( + jobName, + jobName, + updateJobBean.getJobCronTime() + ); + + } +} + + diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java new file mode 100644 index 0000000..abb52ec --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -0,0 +1,36 @@ +package io.wdd.rpc.controller; + + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.wdd.common.beans.response.R; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_HEALTHY_STATUS_MAP; +import static io.wdd.rpc.status.MonitorAllAgentStatus.HEALTHY_STATUS_AGENT_LIST_MAP; + +@RestController +@Api("Agent运行状态Controller") +@RequestMapping("/octopus/server/status") +public class StatusController { + + + @ApiOperation("获取所有Agent的运行状态") + @GetMapping("/agentStatus") + public R GetAllAgentHealthyStatus(){ + + return R.ok(ALL_AGENT_HEALTHY_STATUS_MAP); + } + + @ApiOperation("获取 状态-Agent Map") + @GetMapping("/statusAgent") + public R GetHealthyStatusAgentList(){ + + return R.ok(HEALTHY_STATUS_AGENT_LIST_MAP); + } + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java index 4ff4e16..e964a56 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java @@ -6,15 +6,21 @@ import java.util.List; public interface CoreExecutionService { - String SendCommandToAgent(String topicName, String command); + String SendCommandToAgent(String agentTopicName, String command); - String SendCommandToAgent(String topicName, List commandList); + String SendCommandToAgent(String agentTopicName, List commandList); - String SendCommandToAgent(String topicName, String type, List command); + /** + * @param agentTopicName agent唯一表示名 + * @param type 任务执行类型 + * @param command 任务列表内容 + * @return redis中的 result key + */ + String SendCommandToAgent(String agentTopicName, String type, List command); - List SendCommandToAgent(List topicNameList, String type, List command); + List SendCommandToAgent(List agentTopicNameList, String type, List command); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index cecad85..e55c347 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.message.sender.ToAgentMessageSender; @@ -18,6 +19,7 @@ import java.util.List; import java.util.stream.Collectors; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; +import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; @Service @Slf4j @@ -36,20 +38,26 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { BuildStreamReader buildStreamReader; @Override - public String SendCommandToAgent(String topicName, String command) { - return this.SendCommandToAgent(topicName, + public String SendCommandToAgent(String agentagentTopicName, String command) { + return this.SendCommandToAgent(agentagentTopicName, List.of(command)); } @Override - public String SendCommandToAgent(String topicName, List commandList) { - return this.SendCommandToAgent(topicName, + public String SendCommandToAgent(String agentagentTopicName, List commandList) { + return this.SendCommandToAgent(agentagentTopicName, "manual-command", commandList); } @Override - public String SendCommandToAgent(String topicName, String type, List commandList) { + public String SendCommandToAgent(String agentTopicName, String type, List commandList) { + + // 检查agentTopicName是否存在 + if (!ALL_AGENT_TOPIC_NAME_LIST.contains(agentTopicName)) { + log.error("agentTopicName异常!"); + throw new MyRuntimeException("agentTopicName异常!"); + } // 归一化type类型 不行 @@ -58,7 +66,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { ExecutionMessage.builder() .type(type) .commandList(commandList) - .resultKey(ExecutionMessage.GetResultKey(topicName)) + .resultKey(ExecutionMessage.GetResultKey(agentTopicName)) .build(); @@ -75,7 +83,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { .type(OctopusMessageType.EXECUTOR) .init_time(LocalDateTime.now()) .content(executionMessageString) - .uuid(topicName) + .uuid(agentTopicName) .build(); String resultKey = executionMessage.getResultKey(); @@ -96,14 +104,15 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // construct the persistent Bean - ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, executionMessage); + ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, + executionMessage); // send resultKey to ExecutionResultDaemonHandler WAIT_EXECUTION_RESULT_LIST.put(resultKey, executionLog); // help gc executionMessage = null; - octopusMessage =null; + octopusMessage = null; return resultKey; } @@ -120,20 +129,21 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override - public List SendCommandToAgent(List topicNameList, String type, List command) { - return topicNameList.stream() - .map( - topicName -> { - return this.SendCommandToAgent(topicName, - type, - command); - } - ) - .collect(Collectors.toList()); + public List SendCommandToAgent(List agentagentTopicNameList, String type, List command) { + return agentagentTopicNameList + .stream() + .map( + agentTopicName -> this.SendCommandToAgent + ( + agentTopicName, + type, + command) + ) + .collect(Collectors.toList()); } @Deprecated - private OctopusMessage generateOctopusMessage(String topicName, String type, List commandList) { + private OctopusMessage generateOctopusMessage(String agentTopicName, String type, List commandList) { return null; } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java similarity index 93% rename from server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java rename to server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java index 3279ca4..8616bf2 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java @@ -8,7 +8,7 @@ import org.springframework.scheduling.quartz.QuartzJobBean; import javax.annotation.Resource; -public class MonitorAllAgentStatusJob extends QuartzJobBean { +public class AgentStatusMonitorJob extends QuartzJobBean { @Resource MonitorAllAgentStatus monitorAllAgentStatus; diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index 8568830..2ef7b3f 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -2,7 +2,7 @@ package io.wdd.rpc.scheduler.service; import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; -import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob; +import io.wdd.rpc.scheduler.job.AgentStatusMonitorJob; import lombok.extern.slf4j.Slf4j; import org.quartz.CronExpression; import org.springframework.beans.factory.annotation.Value; @@ -22,7 +22,7 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINC public class BuildStatusScheduleTask { @Resource - OctopusQuartzService octopusQuartzService; + QuartzSchedulerService octopusQuartzService; @Value(value = "${octopus.status.healthy.cron}") String healthyCronTimeExpress; @@ -99,7 +99,7 @@ public class BuildStatusScheduleTask { // build the Job octopusQuartzService.addJob( - MonitorAllAgentStatusJob.class, + AgentStatusMonitorJob.class, "monitorAllAgentStatusJob", JOB_GROUP_NAME, healthyCheckStartDelaySeconds, diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerService.java similarity index 92% rename from server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java rename to server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerService.java index 2d0955d..36b9ca2 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerService.java @@ -7,7 +7,8 @@ import java.util.List; import java.util.Map; -public interface OctopusQuartzService { +public interface QuartzSchedulerService { + boolean addJob(OctopusQuartzJob quartzJob); @@ -43,13 +44,14 @@ public interface OctopusQuartzService { void updateJob(String jobName, String jobGroupName, String jobTime); - /** * 删除一个任务job + * * @param jobName * @param jobGroupName + * @return */ - void deleteJob(String jobName, String jobGroupName); + boolean deleteJob(String jobName, String jobGroupName); /** * 暂停一个任务job @@ -85,6 +87,10 @@ public interface OctopusQuartzService { List> queryRunJob(); + /** + * 获取所有的触发器 + * + * */ List queryAllTrigger(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java similarity index 60% rename from server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java rename to server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java index 4b46df0..073a3c3 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/QuartzSchedulerServiceImpl.java @@ -24,7 +24,7 @@ import static org.quartz.TriggerBuilder.newTrigger; */ @Slf4j @Service -public class OctopusQuartzServiceImpl implements OctopusQuartzService { +public class QuartzSchedulerServiceImpl implements QuartzSchedulerService { @Autowired private Scheduler scheduler; @@ -48,12 +48,12 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { /** * 增加一个job * - * @param jobClass 任务实现类 - * @param jobName 任务名称 - * @param jobGroupName 任务组名 - * @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务) - * @param jobRunRepeatTimes 运行的次数 (<0:表示不限次数) - * @param jobData 参数 + * @param jobClass 任务实现类 + * @param jobName 任务名称 + * @param jobGroupName 任务组名 + * @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务) + * @param jobRunRepeatTimes 运行的次数 (<0:表示不限次数) + * @param jobData 参数 */ @Override public void addJob(Class jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData) { @@ -61,17 +61,25 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { // 任务名称和组构成任务key JobDetail jobDetail = JobBuilder .newJob(jobClass) - .withIdentity(jobName, jobGroupName) + .withIdentity( + jobName, + jobGroupName + ) .build(); // 设置job参数 if (jobData != null && jobData.size() > 0) { - jobDetail.getJobDataMap().putAll(jobData); + jobDetail + .getJobDataMap() + .putAll(jobData); } // 使用simpleTrigger规则 Trigger trigger = newTrigger() - .withIdentity(jobName, jobGroupName) + .withIdentity( + jobName, + jobGroupName + ) .withSchedule( SimpleScheduleBuilder .repeatSecondlyForTotalCount( @@ -82,9 +90,17 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { .startNow() .build(); - log.debug("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); + log.debug( + "jobDataMap: {}", + jobDetail + .getJobDataMap() + .getWrappedMap() + ); - scheduler.scheduleJob(jobDetail, trigger); + scheduler.scheduleJob( + jobDetail, + trigger + ); } catch (SchedulerException e) { e.printStackTrace(); @@ -108,10 +124,18 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { // 创建jobDetail实例,绑定Job实现类 // 指明job的名称,所在组的名称,以及绑定job类 // 任务名称和组构成任务key - JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); + JobDetail jobDetail = JobBuilder + .newJob(jobClass) + .withIdentity( + jobName, + jobGroupName + ) + .build(); // 设置job参数 if (jobData != null && jobData.size() > 0) { - jobDetail.getJobDataMap().putAll(jobData); + jobDetail + .getJobDataMap() + .putAll(jobData); } // 定义调度触发规则 @@ -124,9 +148,15 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { } Trigger trigger = newTrigger() - .withIdentity(jobName, jobGroupName) + .withIdentity( + jobName, + jobGroupName + ) .startAt( - DateBuilder.futureDate(startTime, IntervalUnit.SECOND) + DateBuilder.futureDate( + startTime, + IntervalUnit.SECOND + ) ) .withSchedule( CronScheduleBuilder.cronSchedule(cronJobExpression) @@ -135,8 +165,14 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { .build(); // 把作业和触发器注册到任务调度中 - scheduler.scheduleJob(jobDetail, trigger); - log.info("jobDataMap: {}", jobDetail.getJobDataMap()); + scheduler.scheduleJob( + jobDetail, + trigger + ); + log.info( + "jobDataMap: {}", + jobDetail.getJobDataMap() + ); } catch (Exception e) { e.printStackTrace(); @@ -154,12 +190,25 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { @Override public void updateJob(String jobName, String jobGroupName, String jobTime) { try { - TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); + TriggerKey triggerKey = TriggerKey.triggerKey( + jobName, + jobGroupName + ); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); - log.info("new jobTime: {}", jobTime); - trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); + log.info( + "new jobTime: {}", + jobTime + ); + trigger = trigger + .getTriggerBuilder() + .withIdentity(triggerKey) + .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)) + .build(); // 重启触发器 - scheduler.rescheduleJob(triggerKey, trigger); + scheduler.rescheduleJob( + triggerKey, + trigger + ); } catch (SchedulerException e) { e.printStackTrace(); throw new MyRuntimeException("update job error!"); @@ -171,15 +220,22 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { * * @param jobName 任务名称 * @param jobGroupName 任务组名 + * @return */ @Override - public void deleteJob(String jobName, String jobGroupName) { + public boolean deleteJob(String jobName, String jobGroupName) { + try { - scheduler.deleteJob(new JobKey(jobName, jobGroupName)); + scheduler.deleteJob(new JobKey( + jobName, + jobGroupName + )); + return true; + } catch (Exception e) { - e.printStackTrace(); throw new MyRuntimeException("delete job error!"); } + } /** @@ -191,7 +247,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { @Override public void pauseJob(String jobName, String jobGroupName) { try { - JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); + JobKey jobKey = JobKey.jobKey( + jobName, + jobGroupName + ); scheduler.pauseJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); @@ -208,7 +267,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { @Override public void resumeJob(String jobName, String jobGroupName) { try { - JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); + JobKey jobKey = JobKey.jobKey( + jobName, + jobGroupName + ); scheduler.resumeJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); @@ -225,7 +287,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { @Override public void runAJobNow(String jobName, String jobGroupName) { try { - JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); + JobKey jobKey = JobKey.jobKey( + jobName, + jobGroupName + ); scheduler.triggerJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); @@ -246,19 +311,40 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { Set jobKeys = scheduler.getJobKeys(matcher); jobList = new ArrayList>(); for (JobKey jobKey : jobKeys) { - log.info("maps: {}", scheduler.getJobDetail(jobKey).getJobDataMap().getWrappedMap()); + log.info( + "maps: {}", + scheduler + .getJobDetail(jobKey) + .getJobDataMap() + .getWrappedMap() + ); List triggers = scheduler.getTriggersOfJob(jobKey); for (Trigger trigger : triggers) { Map map = new HashMap<>(); - map.put("jobName", jobKey.getName()); - map.put("jobGroupName", jobKey.getGroup()); - map.put("description", "触发器:" + trigger.getKey()); + map.put( + "jobName", + jobKey.getName() + ); + map.put( + "jobGroupName", + jobKey.getGroup() + ); + map.put( + "description", + "触发器:" + trigger.getKey() + ); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); - map.put("jobStatus", triggerState.name()); + map.put( + "jobStatus", + triggerState.name() + ); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); - map.put("jobTime", cronExpression); + map.put( + "jobTime", + cronExpression + ); } jobList.add(map); } @@ -286,15 +372,30 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { JobDetail jobDetail = executingJob.getJobDetail(); JobKey jobKey = jobDetail.getKey(); Trigger trigger = executingJob.getTrigger(); - map.put("jobName", jobKey.getName()); - map.put("jobGroupName", jobKey.getGroup()); - map.put("description", "触发器:" + trigger.getKey()); + map.put( + "jobName", + jobKey.getName() + ); + map.put( + "jobGroupName", + jobKey.getGroup() + ); + map.put( + "description", + "触发器:" + trigger.getKey() + ); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); - map.put("jobStatus", triggerState.name()); + map.put( + "jobStatus", + triggerState.name() + ); if (trigger instanceof CronTrigger) { CronTrigger cronTrigger = (CronTrigger) trigger; String cronExpression = cronTrigger.getCronExpression(); - map.put("jobTime", cronExpression); + map.put( + "jobTime", + cronExpression + ); } jobList.add(map); } @@ -310,17 +411,21 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { try { - return scheduler.getTriggerKeys( - GroupMatcher.groupEquals(JOB_GROUP_NAME) - ).stream().map( - triggerKey -> { - try { - return scheduler.getTrigger(triggerKey); - } catch (SchedulerException e) { - throw new RuntimeException(e); - } - } - ).collect(Collectors.toList()); + return scheduler + .getTriggerKeys( + GroupMatcher.groupEquals(JOB_GROUP_NAME) + ) + .stream() + .map( + triggerKey -> { + try { + return scheduler.getTrigger(triggerKey); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + } + ) + .collect(Collectors.toList()); } catch (SchedulerException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java index e4d3ba5..20de995 100644 --- a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java @@ -1,23 +1,18 @@ package io.wdd.rpc.status; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.beans.status.AgentHealthyStatusEnum; import io.wdd.common.beans.status.OctopusStatusMessage; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; -import io.wdd.rpc.scheduler.service.OctopusQuartzService; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; -import org.quartz.Trigger; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -26,6 +21,9 @@ import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MES import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES; /** + * 更新频率被类 BuildStatusScheduleTask.class控制 + * + * * 获取所有注册的Agent *

* 发送状态检查信息, agent需要update相应的HashMap的值 @@ -41,10 +39,25 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC public class MonitorAllAgentStatus { private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; + /** + * 存储 状态对应Agent列表的Map + * Agent的状态描述为 AgentHealthyStatusEnum + * HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] + * FAILED -> ["agentTopicName-1", "agentTopicName-2"] + */ + public static final Map> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>(); - private HashMap AGENT_HEALTHY_INIT_MAP; - private List ALL_AGENT_TOPICNAME_LIST; + /** + * 存储所有Agent状态的Map + * + * 内容为 agentTopicName-健康状态 + */ + public static final Map ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>(); + /** + * 存储所有的AgentTopicName的缓存 + */ + public static final Set ALL_AGENT_TOPIC_NAME_LIST = new HashSet<>(); @Resource RedisTemplate redisTemplate; @Resource @@ -53,6 +66,7 @@ public class MonitorAllAgentStatus { CoreServerService coreServerService; @Resource BuildStatusScheduleTask buildStatusScheduleTask; + private HashMap AGENT_HEALTHY_INIT_MAP; public void go() { @@ -61,8 +75,14 @@ public class MonitorAllAgentStatus { // 1. 获取所有注册的Agent // todo need to cache this List allAgentInfo = coreServerService.serverGetAll(); - Assert.notEmpty(allAgentInfo,"not agent registered ! skip the agent healthy status check !"); - ALL_AGENT_TOPICNAME_LIST = allAgentInfo.stream().map(ServerInfoVO::getTopicName).collect(Collectors.toList()); + Assert.notEmpty(allAgentInfo, + "not agent registered ! skip the agent healthy status check !"); + + Set collect = allAgentInfo.stream() + .map(ServerInfoVO::getTopicName) + .collect(Collectors.toSet()); + ALL_AGENT_TOPIC_NAME_LIST.clear(); + ALL_AGENT_TOPIC_NAME_LIST.addAll(collect); // 1.1 检查 Agent状态保存数据结构是否正常 checkOrCreateRedisHealthyKey(); @@ -90,56 +110,102 @@ public class MonitorAllAgentStatus { // build the redis all agent healthy map struct HashMap initMap = new HashMap<>(32); - ALL_AGENT_TOPICNAME_LIST.stream().forEach( - agentTopicName -> { - initMap.put(agentTopicName, "0"); - } - ); - initMap.put("updateTime", TimeUtils.currentTimeString()); + ALL_AGENT_TOPICNAME_LIST.stream() + .forEach( + agentTopicName -> { + initMap.put(agentTopicName, + "0"); + } + ); + initMap.put("updateTime", + TimeUtils.currentTimeString()); // cache this map struct AGENT_HEALTHY_INIT_MAP = initMap; - redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, initMap); + redisTemplate.opsForHash() + .putAll(ALL_AGENT_STATUS_REDIS_KEY, + initMap); } } private void buildAndSendAgentHealthMessage() { - List collect = ALL_AGENT_TOPICNAME_LIST.stream().map( - agentTopicName -> OctopusStatusMessage.builder() - .agentTopicName(agentTopicName) - .type(HEALTHY_STATUS_MESSAGE_TYPE) - .build() - ).collect(Collectors.toList()); + List collect = ALL_AGENT_TOPICNAME_LIST + .stream() + .map( + agentTopicName -> OctopusStatusMessage + .builder() + .agentTopicName(agentTopicName) + .type(HEALTHY_STATUS_MESSAGE_TYPE) + .build() + ) + .collect(Collectors.toList()); collectAgentStatus.statusMessageToAgent(collect); } private void updateAllAgentHealthyStatus() { - List statusList = redisTemplate.opsForHash().multiGet( - ALL_AGENT_STATUS_REDIS_KEY, - ALL_AGENT_TOPICNAME_LIST); + List statusList = redisTemplate + .opsForHash() + .multiGet( + ALL_AGENT_STATUS_REDIS_KEY, + ALL_AGENT_TOPICNAME_LIST); // current log to console is ok - HashMap tmp = new HashMap<>(32); + // agent-topic-name : STATUS(healthy, failed, unknown) + HashMap agentStatusMap = new HashMap<>(32); for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { - tmp.put( + agentStatusMap.put( ALL_AGENT_TOPICNAME_LIST.get(i), uniformHealthyStatus(String.valueOf(statusList.get(i))) ); } String currentTimeString = TimeUtils.currentTimeString(); - log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp); + log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", + currentTimeString, + agentStatusMap); + + // 2023-01-16 + ALL_AGENT_HEALTHY_STATUS_MAP.clear(); + ALL_AGENT_HEALTHY_STATUS_MAP.putAll(agentStatusMap); + + // 2023-01-16 + Map> statusAgentListMap = agentStatusMap + .entrySet() + .stream() + .collect( + Collectors.groupingBy( + Map.Entry::getValue + ) + ) + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> entry + .getValue() + .stream() + .map( + Map.Entry::getKey + ) + .collect(Collectors.toList()) + ) + ); + HEALTHY_STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); + log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了"); + // help gc - tmp = null; + agentStatusMap = null; // Trigger调用Agent Metric 任务 ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); for (int i = 0; i < statusList.size(); i++) { - if (statusList.get(i).equals("1")) { + if (statusList.get(i) + .equals("1")) { allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i)); } } @@ -148,19 +214,22 @@ public class MonitorAllAgentStatus { buildStatusScheduleTask.buildAgentMetricScheduleTask(); // update time - AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); + AGENT_HEALTHY_INIT_MAP.put("updateTime", + currentTimeString); // init the healthy map - redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, AGENT_HEALTHY_INIT_MAP); + redisTemplate.opsForHash() + .putAll(ALL_AGENT_STATUS_REDIS_KEY, + AGENT_HEALTHY_INIT_MAP); } private String uniformHealthyStatus(String agentStatus) { switch (agentStatus) { case "0": - return "FAILED"; + return AgentHealthyStatusEnum.FAILED.getStatus(); case "1": - return "HEALTHY"; + return AgentHealthyStatusEnum.HEALTHY.getStatus(); default: - return "UNKNOWN"; + return AgentHealthyStatusEnum.UNKNOWN.getStatus(); } } diff --git a/server/src/main/resources/mapper/ServerAppRelationMapper.xml b/server/src/main/resources/mapper/QuartzSchedulerService.xml similarity index 100% rename from server/src/main/resources/mapper/ServerAppRelationMapper.xml rename to server/src/main/resources/mapper/QuartzSchedulerService.xml