[ server ] [ execution ]- optimize some controller - 1

This commit is contained in:
zeaslity
2023-01-16 17:11:43 +08:00
parent 9332ca5533
commit 41396e024c
13 changed files with 523 additions and 200 deletions

View File

@@ -0,0 +1,34 @@
package io.wdd.common.beans.status;
import lombok.Getter;
import lombok.Setter;
/**
* AgentHealthy状态描述实体类
* Agent存货状态描述
*/
@Getter
public enum AgentHealthyStatusEnum {
FAILED("FAILED", "Agent存活状态为 失败"),
HEALTHY("HEALTHY", "Agent存活状态为 存活"),
UNKNOWN("UNKNOWN", "Agent存活状态 未知");
String status;
String description;
AgentHealthyStatusEnum(String status, String description) {
this.description = description;
this.status = status;
}
}

View File

@@ -1,5 +1,7 @@
package io.wdd.rpc.controller; package io.wdd.rpc.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.wdd.common.beans.response.R; import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.CoreExecutionService;
@@ -17,7 +19,8 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_RED
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
@RestController @RestController
@RequestMapping("octopus/server/executor") @RequestMapping("/octopus/server/executor")
@Api("Agent执行命令的Controller")
public class ExecutionController { public class ExecutionController {
@Resource @Resource
@@ -37,9 +40,16 @@ public class ExecutionController {
String streamKey = ""; String streamKey = "";
if (StringUtils.isEmpty(type)) { if (StringUtils.isEmpty(type)) {
streamKey = coreExecutionService.SendCommandToAgent(topicName, commandList); streamKey = coreExecutionService.SendCommandToAgent(
topicName,
commandList
);
} else { } else {
streamKey = coreExecutionService.SendCommandToAgent(topicName, type, commandList); streamKey = coreExecutionService.SendCommandToAgent(
topicName,
type,
commandList
);
} }
return R.ok(streamKey); return R.ok(streamKey);
@@ -50,7 +60,10 @@ public class ExecutionController {
@RequestParam(value = "streamKey") String streamKey @RequestParam(value = "streamKey") String streamKey
) { ) {
buildStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER , streamKey); buildStreamReader.registerStreamReader(
COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
streamKey
);
} }
@@ -59,18 +72,58 @@ public class ExecutionController {
@RequestParam(value = "streamKey") String streamKey @RequestParam(value = "streamKey") String streamKey
) { ) {
buildStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER , streamKey); buildStreamReader.registerStreamReader(
AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER,
streamKey
);
} }
@PostMapping("/agentUpdate") // auth required
public void AgentUpdate( @PostMapping("/AgentUpdate")
@ApiOperation("控制Agent升级的接口")
public R<String> AgentUpdate(
@RequestParam(value = "agentTopicName") String agentTopicName @RequestParam(value = "agentTopicName") String agentTopicName
) { ) {
return R.ok(
coreExecutionService
.SendCommandToAgent(
agentTopicName,
"AgentUpdate",
null
));
}
@PostMapping("/AgentReboot")
@ApiOperation("控制Agent重启的接口")
public R<String> AgentReboot(
@RequestParam(value = "agentTopicName") String agentTopicName
) {
return R.ok(
coreExecutionService
.SendCommandToAgent(
agentTopicName,
"AgentRestart",
null
));
}
@PostMapping("/AgentShutdown")
@ApiOperation("控制Agent关闭的接口")
public R<String> AgentShutdown(
@RequestParam(value = "agentTopicName") String agentTopicName
) {
return R.ok(
coreExecutionService
.SendCommandToAgent(
agentTopicName,
"AgentShutdown",
null
));
} }

View File

@@ -1,75 +0,0 @@
package io.wdd.rpc.controller;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.annotations.*;
import io.wdd.rpc.scheduler.config.UpdateJobBean;
import io.wdd.rpc.scheduler.service.OctopusQuartzService;
import org.quartz.JobDataMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@Api(value = "quartz增删改查相关API")
@RequestMapping(value = "/quartz")
public class QuartzCRUDController {
@Autowired
OctopusQuartzService octopusQuartzService;
// @ApiOperation(value = "使用quartz添加job")
// @RequestMapping(value = "/addJob/{jobUUID}", method = RequestMethod.POST)
// public void addQuartzJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID, @ApiParam(name = "JobXXXBean") @RequestBody JobXXXBean jobXXXBean) {
//
// if (jobXXXBean.getOpenBean() != null) {
// JobDataMap jobDataMap = new JobDataMap();
// jobDataMap.put("key01", jobXXXBean.getKey01());
// jobDataMap.put("key02", jobXXXBean.getKey02());
// jobDataMap.put("key03", jobXXXBean.getKey03());
// jobDataMap.put("jobTimeCron", jobXXXBean.getJobTimeCron());
// jobDataMap.put("key04", jobXXXBean.getKey04());
// octopusQuartzService.addJob(Job1.class, jobUUID, jobUUID, jobXXXBean.getJobTimeCron(), jobDataMap);
// } else {
// throw new BadRequestException("参数错误");
// }
// }
@ApiOperation(value = "使用quartz查询所有job")
@RequestMapping(value = "/queryAllJob", method = RequestMethod.GET)
public List<Map<String, Object>> queryAllQuartzJob() {
List<Map<String, Object>> list = octopusQuartzService.queryAllJob();
return list;
}
@ApiOperation(value = "使用quartz查询所有运行job")
@RequestMapping(value = "/queryRunJob", method = RequestMethod.GET)
public List<Map<String, Object>> queryRunQuartzJob() {
List<Map<String, Object>> list = octopusQuartzService.queryRunJob();
return list;
}
@ApiOperation(value = "使用quartz删除job")
@RequestMapping(value = "/deleteJob/{jobUUID}", method = RequestMethod.DELETE)
public void deleteJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID) {
octopusQuartzService.deleteJob(jobUUID, jobUUID);
}
@ApiOperation(value = "使用quartz修改job的cron时间")
@RequestMapping(value = "/updateJob/{jobUUID}", method = RequestMethod.PUT)
public void deleteJob(@ApiParam(name = "jobUUID") @PathVariable("jobUUID") String jobUUID, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean) {
octopusQuartzService.updateJob(jobUUID, jobUUID, updateJobBean.getJobCronTime());
}
}

View File

@@ -0,0 +1,79 @@
package io.wdd.rpc.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.wdd.common.beans.response.R;
import io.wdd.rpc.scheduler.config.UpdateJobBean;
import io.wdd.rpc.scheduler.service.QuartzSchedulerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@Api(value = "定时任务控制中心的Controller")
@RequestMapping(value = "/octopus/server/scheduler")
public class SchedulerController {
@Autowired
QuartzSchedulerService octopusQuartzService;
@ApiOperation(value = "查询所有job")
@GetMapping(value = "/queryAllJob")
public R<List<Map<String, Object>>> queryAllQuartzJob() {
return R.ok(octopusQuartzService.queryAllJob());
}
@ApiOperation(value = "查询所有运行job")
@PostMapping(value = "/queryRunJob")
public R<List<Map<String, Object>>> queryRunQuartzJob() {
return R.ok(octopusQuartzService.queryRunJob());
}
@ApiOperation(value = "删除一个job")
@PostMapping(value = "/deleteJob/{jobName}")
public R<String> deleteJob(
@ApiParam(name = "jobName") @RequestParam("jobName") String jobName
) {
boolean deleteJob = octopusQuartzService.deleteJob(
jobName,
jobName
);
String result = String.format(
"删除任务[ %s ]结果为 [ %s ]",
jobName,
deleteJob
);
if (deleteJob) {
return R.ok(result);
} else {
return R.failed(result);
}
}
@ApiOperation(value = "修改job的cron时间")
@PostMapping(value = "/updateJob/{jobName}")
public void deleteJob(
@ApiParam(name = "jobName") @PathVariable("jobName") String jobName, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean
) {
octopusQuartzService.updateJob(
jobName,
jobName,
updateJobBean.getJobCronTime()
);
}
}

View File

@@ -0,0 +1,36 @@
package io.wdd.rpc.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.wdd.common.beans.response.R;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_HEALTHY_STATUS_MAP;
import static io.wdd.rpc.status.MonitorAllAgentStatus.HEALTHY_STATUS_AGENT_LIST_MAP;
@RestController
@Api("Agent运行状态Controller")
@RequestMapping("/octopus/server/status")
public class StatusController {
@ApiOperation("获取所有Agent的运行状态")
@GetMapping("/agentStatus")
public R<Map> GetAllAgentHealthyStatus(){
return R.ok(ALL_AGENT_HEALTHY_STATUS_MAP);
}
@ApiOperation("获取 状态-Agent Map")
@GetMapping("/statusAgent")
public R<Map> GetHealthyStatusAgentList(){
return R.ok(HEALTHY_STATUS_AGENT_LIST_MAP);
}
}

View File

@@ -6,15 +6,21 @@ import java.util.List;
public interface CoreExecutionService { public interface CoreExecutionService {
String SendCommandToAgent(String topicName, String command); String SendCommandToAgent(String agentTopicName, String command);
String SendCommandToAgent(String topicName, List<String> commandList); String SendCommandToAgent(String agentTopicName, List<String> commandList);
String SendCommandToAgent(String topicName, String type, List<String> command); /**
* @param agentTopicName agent唯一表示名
* @param type 任务执行类型
* @param command 任务列表内容
* @return redis中的 result key
*/
String SendCommandToAgent(String agentTopicName, String type, List<String> command);
List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command); List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
} }

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender; import io.wdd.rpc.message.sender.ToAgentMessageSender;
@@ -18,6 +19,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_LIST;
@Service @Service
@Slf4j @Slf4j
@@ -36,20 +38,26 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
BuildStreamReader buildStreamReader; BuildStreamReader buildStreamReader;
@Override @Override
public String SendCommandToAgent(String topicName, String command) { public String SendCommandToAgent(String agentagentTopicName, String command) {
return this.SendCommandToAgent(topicName, return this.SendCommandToAgent(agentagentTopicName,
List.of(command)); List.of(command));
} }
@Override @Override
public String SendCommandToAgent(String topicName, List<String> commandList) { public String SendCommandToAgent(String agentagentTopicName, List<String> commandList) {
return this.SendCommandToAgent(topicName, return this.SendCommandToAgent(agentagentTopicName,
"manual-command", "manual-command",
commandList); commandList);
} }
@Override @Override
public String SendCommandToAgent(String topicName, String type, List<String> commandList) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_LIST.contains(agentTopicName)) {
log.error("agentTopicName异常!");
throw new MyRuntimeException("agentTopicName异常!");
}
// 归一化type类型 不行 // 归一化type类型 不行
@@ -58,7 +66,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
ExecutionMessage.builder() ExecutionMessage.builder()
.type(type) .type(type)
.commandList(commandList) .commandList(commandList)
.resultKey(ExecutionMessage.GetResultKey(topicName)) .resultKey(ExecutionMessage.GetResultKey(agentTopicName))
.build(); .build();
@@ -75,7 +83,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
.type(OctopusMessageType.EXECUTOR) .type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now()) .init_time(LocalDateTime.now())
.content(executionMessageString) .content(executionMessageString)
.uuid(topicName) .uuid(agentTopicName)
.build(); .build();
String resultKey = executionMessage.getResultKey(); String resultKey = executionMessage.getResultKey();
@@ -96,14 +104,15 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean // construct the persistent Bean
ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, executionMessage); ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage,
executionMessage);
// send resultKey to ExecutionResultDaemonHandler // send resultKey to ExecutionResultDaemonHandler
WAIT_EXECUTION_RESULT_LIST.put(resultKey, WAIT_EXECUTION_RESULT_LIST.put(resultKey,
executionLog); executionLog);
// help gc // help gc
executionMessage = null; executionMessage = null;
octopusMessage =null; octopusMessage = null;
return resultKey; return resultKey;
} }
@@ -120,20 +129,21 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command) { public List<String> SendCommandToAgent(List<String> agentagentTopicNameList, String type, List<String> command) {
return topicNameList.stream() return agentagentTopicNameList
.map( .stream()
topicName -> { .map(
return this.SendCommandToAgent(topicName, agentTopicName -> this.SendCommandToAgent
type, (
command); agentTopicName,
} type,
) command)
.collect(Collectors.toList()); )
.collect(Collectors.toList());
} }
@Deprecated @Deprecated
private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList) { private OctopusMessage generateOctopusMessage(String agentTopicName, String type, List<String> commandList) {
return null; return null;
} }

View File

@@ -8,7 +8,7 @@ import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource; import javax.annotation.Resource;
public class MonitorAllAgentStatusJob extends QuartzJobBean { public class AgentStatusMonitorJob extends QuartzJobBean {
@Resource @Resource
MonitorAllAgentStatus monitorAllAgentStatus; MonitorAllAgentStatus monitorAllAgentStatus;

View File

@@ -2,7 +2,7 @@ package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob;
import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob; import io.wdd.rpc.scheduler.job.AgentStatusMonitorJob;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression; import org.quartz.CronExpression;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@@ -22,7 +22,7 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINC
public class BuildStatusScheduleTask { public class BuildStatusScheduleTask {
@Resource @Resource
OctopusQuartzService octopusQuartzService; QuartzSchedulerService octopusQuartzService;
@Value(value = "${octopus.status.healthy.cron}") @Value(value = "${octopus.status.healthy.cron}")
String healthyCronTimeExpress; String healthyCronTimeExpress;
@@ -99,7 +99,7 @@ public class BuildStatusScheduleTask {
// build the Job // build the Job
octopusQuartzService.addJob( octopusQuartzService.addJob(
MonitorAllAgentStatusJob.class, AgentStatusMonitorJob.class,
"monitorAllAgentStatusJob", "monitorAllAgentStatusJob",
JOB_GROUP_NAME, JOB_GROUP_NAME,
healthyCheckStartDelaySeconds, healthyCheckStartDelaySeconds,

View File

@@ -7,7 +7,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public interface OctopusQuartzService { public interface QuartzSchedulerService {
boolean addJob(OctopusQuartzJob quartzJob); boolean addJob(OctopusQuartzJob quartzJob);
@@ -43,13 +44,14 @@ public interface OctopusQuartzService {
void updateJob(String jobName, String jobGroupName, String jobTime); void updateJob(String jobName, String jobGroupName, String jobTime);
/** /**
* 删除一个任务job * 删除一个任务job
*
* @param jobName * @param jobName
* @param jobGroupName * @param jobGroupName
* @return
*/ */
void deleteJob(String jobName, String jobGroupName); boolean deleteJob(String jobName, String jobGroupName);
/** /**
* 暂停一个任务job * 暂停一个任务job
@@ -85,6 +87,10 @@ public interface OctopusQuartzService {
List<Map<String, Object>> queryRunJob(); List<Map<String, Object>> queryRunJob();
/**
* 获取所有的触发器
*
* */
List<Trigger> queryAllTrigger(); List<Trigger> queryAllTrigger();

View File

@@ -24,7 +24,7 @@ import static org.quartz.TriggerBuilder.newTrigger;
*/ */
@Slf4j @Slf4j
@Service @Service
public class OctopusQuartzServiceImpl implements OctopusQuartzService { public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
@Autowired @Autowired
private Scheduler scheduler; private Scheduler scheduler;
@@ -48,12 +48,12 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
/** /**
* 增加一个job * 增加一个job
* *
* @param jobClass 任务实现类 * @param jobClass 任务实现类
* @param jobName 任务名称 * @param jobName 任务名称
* @param jobGroupName 任务组名 * @param jobGroupName 任务组名
* @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务) * @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务)
* @param jobRunRepeatTimes 运行的次数 <0:表示不限次数 * @param jobRunRepeatTimes 运行的次数 <0:表示不限次数
* @param jobData 参数 * @param jobData 参数
*/ */
@Override @Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData) { public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData) {
@@ -61,17 +61,25 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
// 任务名称和组构成任务key // 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder JobDetail jobDetail = JobBuilder
.newJob(jobClass) .newJob(jobClass)
.withIdentity(jobName, jobGroupName) .withIdentity(
jobName,
jobGroupName
)
.build(); .build();
// 设置job参数 // 设置job参数
if (jobData != null && jobData.size() > 0) { if (jobData != null && jobData.size() > 0) {
jobDetail.getJobDataMap().putAll(jobData); jobDetail
.getJobDataMap()
.putAll(jobData);
} }
// 使用simpleTrigger规则 // 使用simpleTrigger规则
Trigger trigger = newTrigger() Trigger trigger = newTrigger()
.withIdentity(jobName, jobGroupName) .withIdentity(
jobName,
jobGroupName
)
.withSchedule( .withSchedule(
SimpleScheduleBuilder SimpleScheduleBuilder
.repeatSecondlyForTotalCount( .repeatSecondlyForTotalCount(
@@ -82,9 +90,17 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
.startNow() .startNow()
.build(); .build();
log.debug("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); log.debug(
"jobDataMap: {}",
jobDetail
.getJobDataMap()
.getWrappedMap()
);
scheduler.scheduleJob(jobDetail, trigger); scheduler.scheduleJob(
jobDetail,
trigger
);
} catch (SchedulerException e) { } catch (SchedulerException e) {
e.printStackTrace(); e.printStackTrace();
@@ -108,10 +124,18 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
// 创建jobDetail实例绑定Job实现类 // 创建jobDetail实例绑定Job实现类
// 指明job的名称所在组的名称以及绑定job类 // 指明job的名称所在组的名称以及绑定job类
// 任务名称和组构成任务key // 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); JobDetail jobDetail = JobBuilder
.newJob(jobClass)
.withIdentity(
jobName,
jobGroupName
)
.build();
// 设置job参数 // 设置job参数
if (jobData != null && jobData.size() > 0) { if (jobData != null && jobData.size() > 0) {
jobDetail.getJobDataMap().putAll(jobData); jobDetail
.getJobDataMap()
.putAll(jobData);
} }
// 定义调度触发规则 // 定义调度触发规则
@@ -124,9 +148,15 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
} }
Trigger trigger = newTrigger() Trigger trigger = newTrigger()
.withIdentity(jobName, jobGroupName) .withIdentity(
jobName,
jobGroupName
)
.startAt( .startAt(
DateBuilder.futureDate(startTime, IntervalUnit.SECOND) DateBuilder.futureDate(
startTime,
IntervalUnit.SECOND
)
) )
.withSchedule( .withSchedule(
CronScheduleBuilder.cronSchedule(cronJobExpression) CronScheduleBuilder.cronSchedule(cronJobExpression)
@@ -135,8 +165,14 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
.build(); .build();
// 把作业和触发器注册到任务调度中 // 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger); scheduler.scheduleJob(
log.info("jobDataMap: {}", jobDetail.getJobDataMap()); jobDetail,
trigger
);
log.info(
"jobDataMap: {}",
jobDetail.getJobDataMap()
);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@@ -154,12 +190,25 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
@Override @Override
public void updateJob(String jobName, String jobGroupName, String jobTime) { public void updateJob(String jobName, String jobGroupName, String jobTime) {
try { try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); TriggerKey triggerKey = TriggerKey.triggerKey(
jobName,
jobGroupName
);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
log.info("new jobTime: {}", jobTime); log.info(
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); "new jobTime: {}",
jobTime
);
trigger = trigger
.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime))
.build();
// 重启触发器 // 重启触发器
scheduler.rescheduleJob(triggerKey, trigger); scheduler.rescheduleJob(
triggerKey,
trigger
);
} catch (SchedulerException e) { } catch (SchedulerException e) {
e.printStackTrace(); e.printStackTrace();
throw new MyRuntimeException("update job error!"); throw new MyRuntimeException("update job error!");
@@ -171,15 +220,22 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
* *
* @param jobName 任务名称 * @param jobName 任务名称
* @param jobGroupName 任务组名 * @param jobGroupName 任务组名
* @return
*/ */
@Override @Override
public void deleteJob(String jobName, String jobGroupName) { public boolean deleteJob(String jobName, String jobGroupName) {
try { try {
scheduler.deleteJob(new JobKey(jobName, jobGroupName)); scheduler.deleteJob(new JobKey(
jobName,
jobGroupName
));
return true;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
throw new MyRuntimeException("delete job error!"); throw new MyRuntimeException("delete job error!");
} }
} }
/** /**
@@ -191,7 +247,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
@Override @Override
public void pauseJob(String jobName, String jobGroupName) { public void pauseJob(String jobName, String jobGroupName) {
try { try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); JobKey jobKey = JobKey.jobKey(
jobName,
jobGroupName
);
scheduler.pauseJob(jobKey); scheduler.pauseJob(jobKey);
} catch (SchedulerException e) { } catch (SchedulerException e) {
e.printStackTrace(); e.printStackTrace();
@@ -208,7 +267,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
@Override @Override
public void resumeJob(String jobName, String jobGroupName) { public void resumeJob(String jobName, String jobGroupName) {
try { try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); JobKey jobKey = JobKey.jobKey(
jobName,
jobGroupName
);
scheduler.resumeJob(jobKey); scheduler.resumeJob(jobKey);
} catch (SchedulerException e) { } catch (SchedulerException e) {
e.printStackTrace(); e.printStackTrace();
@@ -225,7 +287,10 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
@Override @Override
public void runAJobNow(String jobName, String jobGroupName) { public void runAJobNow(String jobName, String jobGroupName) {
try { try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); JobKey jobKey = JobKey.jobKey(
jobName,
jobGroupName
);
scheduler.triggerJob(jobKey); scheduler.triggerJob(jobKey);
} catch (SchedulerException e) { } catch (SchedulerException e) {
e.printStackTrace(); e.printStackTrace();
@@ -246,19 +311,40 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher); Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
jobList = new ArrayList<Map<String, Object>>(); jobList = new ArrayList<Map<String, Object>>();
for (JobKey jobKey : jobKeys) { for (JobKey jobKey : jobKeys) {
log.info("maps: {}", scheduler.getJobDetail(jobKey).getJobDataMap().getWrappedMap()); log.info(
"maps: {}",
scheduler
.getJobDetail(jobKey)
.getJobDataMap()
.getWrappedMap()
);
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) { for (Trigger trigger : triggers) {
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
map.put("jobName", jobKey.getName()); map.put(
map.put("jobGroupName", jobKey.getGroup()); "jobName",
map.put("description", "触发器:" + trigger.getKey()); jobKey.getName()
);
map.put(
"jobGroupName",
jobKey.getGroup()
);
map.put(
"description",
"触发器:" + trigger.getKey()
);
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name()); map.put(
"jobStatus",
triggerState.name()
);
if (trigger instanceof CronTrigger) { if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger; CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression(); String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression); map.put(
"jobTime",
cronExpression
);
} }
jobList.add(map); jobList.add(map);
} }
@@ -286,15 +372,30 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
JobDetail jobDetail = executingJob.getJobDetail(); JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey(); JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger(); Trigger trigger = executingJob.getTrigger();
map.put("jobName", jobKey.getName()); map.put(
map.put("jobGroupName", jobKey.getGroup()); "jobName",
map.put("description", "触发器:" + trigger.getKey()); jobKey.getName()
);
map.put(
"jobGroupName",
jobKey.getGroup()
);
map.put(
"description",
"触发器:" + trigger.getKey()
);
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey()); Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name()); map.put(
"jobStatus",
triggerState.name()
);
if (trigger instanceof CronTrigger) { if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger; CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression(); String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression); map.put(
"jobTime",
cronExpression
);
} }
jobList.add(map); jobList.add(map);
} }
@@ -310,17 +411,21 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
try { try {
return scheduler.getTriggerKeys( return scheduler
GroupMatcher.groupEquals(JOB_GROUP_NAME) .getTriggerKeys(
).stream().map( GroupMatcher.groupEquals(JOB_GROUP_NAME)
triggerKey -> { )
try { .stream()
return scheduler.getTrigger(triggerKey); .map(
} catch (SchedulerException e) { triggerKey -> {
throw new RuntimeException(e); try {
} return scheduler.getTrigger(triggerKey);
} } catch (SchedulerException e) {
).collect(Collectors.toList()); throw new RuntimeException(e);
}
}
)
.collect(Collectors.toList());
} catch (SchedulerException e) { } catch (SchedulerException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -1,23 +1,18 @@
package io.wdd.rpc.status; package io.wdd.rpc.status;
import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.status.AgentHealthyStatusEnum;
import io.wdd.common.beans.status.OctopusStatusMessage; import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.scheduler.service.OctopusQuartzService;
import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService; import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.quartz.Trigger;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -26,6 +21,9 @@ import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MES
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES; import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES;
/** /**
* 更新频率被类 BuildStatusScheduleTask.class控制
*
*
* 获取所有注册的Agent * 获取所有注册的Agent
* <p> * <p>
* 发送状态检查信息, agent需要update相应的HashMap的值 * 发送状态检查信息, agent需要update相应的HashMap的值
@@ -41,10 +39,25 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC
public class MonitorAllAgentStatus { public class MonitorAllAgentStatus {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
/**
* 存储 状态对应Agent列表的Map
* Agent的状态描述为 AgentHealthyStatusEnum
* HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
* FAILED -> ["agentTopicName-1" "agentTopicName-2"]
*/
public static final Map<String, List<String>> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>();
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP; /**
private List<String> ALL_AGENT_TOPICNAME_LIST; * 存储所有Agent状态的Map
*
* 内容为 agentTopicName-健康状态
*/
public static final Map<String, String> ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>();
/**
* 存储所有的AgentTopicName的缓存
*/
public static final Set<String> ALL_AGENT_TOPIC_NAME_LIST = new HashSet<>();
@Resource @Resource
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource @Resource
@@ -53,6 +66,7 @@ public class MonitorAllAgentStatus {
CoreServerService coreServerService; CoreServerService coreServerService;
@Resource @Resource
BuildStatusScheduleTask buildStatusScheduleTask; BuildStatusScheduleTask buildStatusScheduleTask;
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
public void go() { public void go() {
@@ -61,8 +75,14 @@ public class MonitorAllAgentStatus {
// 1. 获取所有注册的Agent // 1. 获取所有注册的Agent
// todo need to cache this // todo need to cache this
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll(); List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
Assert.notEmpty(allAgentInfo,"not agent registered ! skip the agent healthy status check !"); Assert.notEmpty(allAgentInfo,
ALL_AGENT_TOPICNAME_LIST = allAgentInfo.stream().map(ServerInfoVO::getTopicName).collect(Collectors.toList()); "not agent registered ! skip the agent healthy status check !");
Set<String> collect = allAgentInfo.stream()
.map(ServerInfoVO::getTopicName)
.collect(Collectors.toSet());
ALL_AGENT_TOPIC_NAME_LIST.clear();
ALL_AGENT_TOPIC_NAME_LIST.addAll(collect);
// 1.1 检查 Agent状态保存数据结构是否正常 // 1.1 检查 Agent状态保存数据结构是否正常
checkOrCreateRedisHealthyKey(); checkOrCreateRedisHealthyKey();
@@ -90,56 +110,102 @@ public class MonitorAllAgentStatus {
// build the redis all agent healthy map struct // build the redis all agent healthy map struct
HashMap<String, String> initMap = new HashMap<>(32); HashMap<String, String> initMap = new HashMap<>(32);
ALL_AGENT_TOPICNAME_LIST.stream().forEach( ALL_AGENT_TOPICNAME_LIST.stream()
agentTopicName -> { .forEach(
initMap.put(agentTopicName, "0"); agentTopicName -> {
} initMap.put(agentTopicName,
); "0");
initMap.put("updateTime", TimeUtils.currentTimeString()); }
);
initMap.put("updateTime",
TimeUtils.currentTimeString());
// cache this map struct // cache this map struct
AGENT_HEALTHY_INIT_MAP = initMap; AGENT_HEALTHY_INIT_MAP = initMap;
redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, initMap); redisTemplate.opsForHash()
.putAll(ALL_AGENT_STATUS_REDIS_KEY,
initMap);
} }
} }
private void buildAndSendAgentHealthMessage() { private void buildAndSendAgentHealthMessage() {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST.stream().map( List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST
agentTopicName -> OctopusStatusMessage.builder() .stream()
.agentTopicName(agentTopicName) .map(
.type(HEALTHY_STATUS_MESSAGE_TYPE) agentTopicName -> OctopusStatusMessage
.build() .builder()
).collect(Collectors.toList()); .agentTopicName(agentTopicName)
.type(HEALTHY_STATUS_MESSAGE_TYPE)
.build()
)
.collect(Collectors.toList());
collectAgentStatus.statusMessageToAgent(collect); collectAgentStatus.statusMessageToAgent(collect);
} }
private void updateAllAgentHealthyStatus() { private void updateAllAgentHealthyStatus() {
List statusList = redisTemplate.opsForHash().multiGet( List statusList = redisTemplate
ALL_AGENT_STATUS_REDIS_KEY, .opsForHash()
ALL_AGENT_TOPICNAME_LIST); .multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPICNAME_LIST);
// current log to console is ok // current log to console is ok
HashMap<String, String> tmp = new HashMap<>(32); // agent-topic-name : STATUS(healthy, failed, unknown)
HashMap<String, String> agentStatusMap = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) {
tmp.put( agentStatusMap.put(
ALL_AGENT_TOPICNAME_LIST.get(i), ALL_AGENT_TOPICNAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i))) uniformHealthyStatus(String.valueOf(statusList.get(i)))
); );
} }
String currentTimeString = TimeUtils.currentTimeString(); String currentTimeString = TimeUtils.currentTimeString();
log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp); log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
currentTimeString,
agentStatusMap);
// 2023-01-16
ALL_AGENT_HEALTHY_STATUS_MAP.clear();
ALL_AGENT_HEALTHY_STATUS_MAP.putAll(agentStatusMap);
// 2023-01-16
Map<String, List<String>> statusAgentListMap = agentStatusMap
.entrySet()
.stream()
.collect(
Collectors.groupingBy(
Map.Entry::getValue
)
)
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry
.getValue()
.stream()
.map(
Map.Entry::getKey
)
.collect(Collectors.toList())
)
);
HEALTHY_STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap);
log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了");
// help gc // help gc
tmp = null; agentStatusMap = null;
// Trigger调用Agent Metric 任务 // Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32); ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) { for (int i = 0; i < statusList.size(); i++) {
if (statusList.get(i).equals("1")) { if (statusList.get(i)
.equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i)); allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i));
} }
} }
@@ -148,19 +214,22 @@ public class MonitorAllAgentStatus {
buildStatusScheduleTask.buildAgentMetricScheduleTask(); buildStatusScheduleTask.buildAgentMetricScheduleTask();
// update time // update time
AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); AGENT_HEALTHY_INIT_MAP.put("updateTime",
currentTimeString);
// init the healthy map // init the healthy map
redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, AGENT_HEALTHY_INIT_MAP); redisTemplate.opsForHash()
.putAll(ALL_AGENT_STATUS_REDIS_KEY,
AGENT_HEALTHY_INIT_MAP);
} }
private String uniformHealthyStatus(String agentStatus) { private String uniformHealthyStatus(String agentStatus) {
switch (agentStatus) { switch (agentStatus) {
case "0": case "0":
return "FAILED"; return AgentHealthyStatusEnum.FAILED.getStatus();
case "1": case "1":
return "HEALTHY"; return AgentHealthyStatusEnum.HEALTHY.getStatus();
default: default:
return "UNKNOWN"; return AgentHealthyStatusEnum.UNKNOWN.getStatus();
} }
} }