From 4812756408587f5aa1c7ce46a7b78c7c20a81708 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Mon, 16 Jan 2023 18:02:36 +0800 Subject: [PATCH] [ server ] [ execution ]- optimize some controller - 2 --- .../rpc/controller/SchedulerController.java | 20 +++- .../service/CoreExecutionServiceImpl.java | 4 +- .../rpc/scheduler/config/UpdateJobBean.java | 24 ---- .../job/AgentRunMetricStatusJob.java | 10 +- .../service/BuildStatusScheduleTask.java | 11 +- .../wdd/rpc/status/MonitorAllAgentStatus.java | 112 +++++++++++------- 6 files changed, 100 insertions(+), 81 deletions(-) delete mode 100644 server/src/main/java/io/wdd/rpc/scheduler/config/UpdateJobBean.java diff --git a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java index 7552239..87add3f 100644 --- a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java +++ b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java @@ -5,8 +5,8 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.wdd.common.beans.response.R; -import io.wdd.rpc.scheduler.config.UpdateJobBean; import io.wdd.rpc.scheduler.service.QuartzSchedulerService; +import org.quartz.Trigger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -38,7 +38,7 @@ public class SchedulerController { } @ApiOperation(value = "删除一个job") - @PostMapping(value = "/deleteJob/{jobName}") + @PostMapping(value = "/deleteJob/") public R deleteJob( @ApiParam(name = "jobName") @RequestParam("jobName") String jobName ) { @@ -64,13 +64,25 @@ public class SchedulerController { @ApiOperation(value = "修改job的cron时间") @PostMapping(value = "/updateJob/{jobName}") public void deleteJob( - @ApiParam(name = "jobName") @PathVariable("jobName") String jobName, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean + @ApiParam(name = "jobName") @RequestParam("jobName") String jobName, + @ApiParam(name = "jobCronTime") @RequestParam("jobCronTime") String jobCronTime ) { octopusQuartzService.updateJob( jobName, jobName, - updateJobBean.getJobCronTime() + jobCronTime + ); + + } + + + @ApiOperation(value = "查询所有的触发器Trigger") + @GetMapping(value = "/allTriggers") + public R> queryAllTriggers() { + + return R.ok( + octopusQuartzService.queryAllTrigger() ); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index e55c347..9168fe3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -19,7 +19,7 @@ import java.util.List; import java.util.stream.Collectors; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; -import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET; @Service @Slf4j @@ -54,7 +54,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { public String SendCommandToAgent(String agentTopicName, String type, List commandList) { // 检查agentTopicName是否存在 - if (!ALL_AGENT_TOPIC_NAME_LIST.contains(agentTopicName)) { + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error("agentTopicName异常!"); throw new MyRuntimeException("agentTopicName异常!"); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/UpdateJobBean.java b/server/src/main/java/io/wdd/rpc/scheduler/config/UpdateJobBean.java deleted file mode 100644 index bad2417..0000000 --- a/server/src/main/java/io/wdd/rpc/scheduler/config/UpdateJobBean.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.wdd.rpc.scheduler.config; - -import com.fasterxml.jackson.annotation.JsonInclude; -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -/** - * @author Andya - * @create 2021/04/01 - */ -@ApiModel(value = "更新job cron时间参数") -@JsonInclude(JsonInclude.Include.NON_NULL) -public class UpdateJobBean { - @ApiModelProperty(value = "jobTime的cron表达式", example = "0 0 1 * * ?") - String jobCronTime; - - public String getJobCronTime() { - return jobCronTime; - } - - public void setJobCronTime(String jobCronTime) { - this.jobCronTime = jobCronTime; - } -} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java index 4a4ecbc..21d2458 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java @@ -16,15 +16,19 @@ public class AgentRunMetricStatusJob extends QuartzJobBean { @Resource AgentRuntimeMetricStatus agentRuntimeMetricStatus; - @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { // 从JobDetailContext中获取相应的信息 - JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + JobDataMap jobDataMap = jobExecutionContext + .getJobDetail() + .getJobDataMap(); // 执行Agent Metric 状态收集任务 - agentRuntimeMetricStatus.collect((Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH)); + agentRuntimeMetricStatus.collect( + (Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), + (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH) + ); // todo 机构设计状态会被存储至 Redis Stream Key 中 // AgentTopicName-Metric diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index 2ef7b3f..b0854bd 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -30,6 +30,9 @@ public class BuildStatusScheduleTask { @Value(value = "${octopus.status.healthy.start-delay}") int healthyCheckStartDelaySeconds; + @Value(value = "${octopus.status.metric.cron}") + int metricReportCronExpress; + @Value(value = "${octopus.status.metric.pinch}") int metricReportTimePinch; @@ -64,16 +67,16 @@ public class BuildStatusScheduleTask { long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000; metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1; - System.out.println("totalSeconds = " + totalSeconds); - System.out.println("metricReportTimesCount = " + metricReportTimesCount); + /*System.out.println("totalSeconds = " + totalSeconds); + System.out.println("metricReportTimesCount = " + metricReportTimesCount);*/ } catch (ParseException e) { throw new RuntimeException(e); } HashMap map = new HashMap(); - map.put(METRIC_REPORT_TIME_PINCH,metricReportTimePinch); - map.put(METRIC_REPORT_TIMES_COUNT,metricReportTimesCount); + map.put(METRIC_REPORT_TIME_PINCH, metricReportTimePinch); + map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount); // build the Job 只发送一次消息,然后让Agent获取消息 (重复间隔,重复次数) 进行相应的处理! // todo 解决创建太多对象的问题,需要缓存相应的内容 diff --git a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java index 20de995..dddf582 100644 --- a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java @@ -22,8 +22,8 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC /** * 更新频率被类 BuildStatusScheduleTask.class控制 - * - * + *

+ *

* 获取所有注册的Agent *

* 发送状态检查信息, agent需要update相应的HashMap的值 @@ -38,26 +38,24 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC @Slf4j public class MonitorAllAgentStatus { - private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; /** - * 存储 状态对应Agent列表的Map + * 存储 状态对应Agent列表的Map * Agent的状态描述为 AgentHealthyStatusEnum * HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] * FAILED -> ["agentTopicName-1", "agentTopicName-2"] */ public static final Map> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>(); - /** * 存储所有Agent状态的Map - * + *

* 内容为 agentTopicName-健康状态 */ public static final Map ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>(); - /** - * 存储所有的AgentTopicName的缓存 + * 存储所有的AgentTopicName的缓存 */ - public static final Set ALL_AGENT_TOPIC_NAME_LIST = new HashSet<>(); + public static final Set ALL_AGENT_TOPIC_NAME_SET = new HashSet<>(); + private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource RedisTemplate redisTemplate; @Resource @@ -66,6 +64,7 @@ public class MonitorAllAgentStatus { CoreServerService coreServerService; @Resource BuildStatusScheduleTask buildStatusScheduleTask; + private List ALL_AGENT_TOPIC_NAME_LIST; private HashMap AGENT_HEALTHY_INIT_MAP; public void go() { @@ -75,14 +74,19 @@ public class MonitorAllAgentStatus { // 1. 获取所有注册的Agent // todo need to cache this List allAgentInfo = coreServerService.serverGetAll(); - Assert.notEmpty(allAgentInfo, - "not agent registered ! skip the agent healthy status check !"); + Assert.notEmpty( + allAgentInfo, + "not agent registered ! skip the agent healthy status check !" + ); - Set collect = allAgentInfo.stream() - .map(ServerInfoVO::getTopicName) - .collect(Collectors.toSet()); - ALL_AGENT_TOPIC_NAME_LIST.clear(); - ALL_AGENT_TOPIC_NAME_LIST.addAll(collect); + ALL_AGENT_TOPIC_NAME_LIST = allAgentInfo + .stream() + .map(ServerInfoVO::getTopicName) + .collect(Collectors.toList()); + + // 2023-01-16 + ALL_AGENT_TOPIC_NAME_SET.clear(); + ALL_AGENT_TOPIC_NAME_SET.addAll(ALL_AGENT_TOPIC_NAME_LIST); // 1.1 检查 Agent状态保存数据结构是否正常 checkOrCreateRedisHealthyKey(); @@ -110,28 +114,38 @@ public class MonitorAllAgentStatus { // build the redis all agent healthy map struct HashMap initMap = new HashMap<>(32); - ALL_AGENT_TOPICNAME_LIST.stream() - .forEach( - agentTopicName -> { - initMap.put(agentTopicName, - "0"); - } - ); - initMap.put("updateTime", - TimeUtils.currentTimeString()); + ALL_AGENT_TOPIC_NAME_LIST + .stream() + .forEach( + agentTopicName -> { + initMap.put( + agentTopicName, + "0" + ); + } + ); + + initMap.put( + "updateTime", + TimeUtils.currentTimeString() + ); // cache this map struct AGENT_HEALTHY_INIT_MAP = initMap; - redisTemplate.opsForHash() - .putAll(ALL_AGENT_STATUS_REDIS_KEY, - initMap); + // create the healthy redis structure + redisTemplate + .opsForHash() + .putAll( + ALL_AGENT_STATUS_REDIS_KEY, + initMap + ); } } private void buildAndSendAgentHealthMessage() { - List collect = ALL_AGENT_TOPICNAME_LIST + List collect = ALL_AGENT_TOPIC_NAME_LIST .stream() .map( agentTopicName -> OctopusStatusMessage @@ -146,26 +160,29 @@ public class MonitorAllAgentStatus { } private void updateAllAgentHealthyStatus() { + List statusList = redisTemplate .opsForHash() .multiGet( ALL_AGENT_STATUS_REDIS_KEY, - ALL_AGENT_TOPICNAME_LIST); - + ALL_AGENT_TOPIC_NAME_LIST + ); // current log to console is ok // agent-topic-name : STATUS(healthy, failed, unknown) HashMap agentStatusMap = new HashMap<>(32); - for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { + for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) { agentStatusMap.put( - ALL_AGENT_TOPICNAME_LIST.get(i), + ALL_AGENT_TOPIC_NAME_LIST.get(i), uniformHealthyStatus(String.valueOf(statusList.get(i))) ); } String currentTimeString = TimeUtils.currentTimeString(); - log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", - currentTimeString, - agentStatusMap); + log.info( + "[ AGENT HEALTHY CHECK ] time is {} , result are => {}", + currentTimeString, + agentStatusMap + ); // 2023-01-16 ALL_AGENT_HEALTHY_STATUS_MAP.clear(); @@ -204,9 +221,10 @@ public class MonitorAllAgentStatus { // Trigger调用Agent Metric 任务 ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); for (int i = 0; i < statusList.size(); i++) { - if (statusList.get(i) - .equals("1")) { - allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i)); + if (statusList + .get(i) + .equals("1")) { + allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i)); } } ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames; @@ -214,12 +232,18 @@ public class MonitorAllAgentStatus { buildStatusScheduleTask.buildAgentMetricScheduleTask(); // update time - AGENT_HEALTHY_INIT_MAP.put("updateTime", - currentTimeString); + AGENT_HEALTHY_INIT_MAP.put( + "updateTime", + currentTimeString + ); + // init the healthy map - redisTemplate.opsForHash() - .putAll(ALL_AGENT_STATUS_REDIS_KEY, - AGENT_HEALTHY_INIT_MAP); + redisTemplate + .opsForHash() + .putAll( + ALL_AGENT_STATUS_REDIS_KEY, + AGENT_HEALTHY_INIT_MAP + ); } private String uniformHealthyStatus(String agentStatus) {