diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java index 31ea0b5..8807dc2 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java @@ -3,6 +3,7 @@ package io.wdd.agent.config.message.handler; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.agent.status.AgentStatusCollector; import io.wdd.agent.status.HealthyReporter; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; @@ -11,7 +12,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; -import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; +import static io.wdd.common.beans.status.OctopusStatusMessage.*; @Component public class OMHandlerStatus extends AbstractOctopusMessageHandler { @@ -22,6 +23,9 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { @Resource HealthyReporter healthyReporter; + @Resource + AgentStatusCollector agentStatusCollector; + @Override public boolean handle(OctopusMessage octopusMessage) { @@ -40,6 +44,16 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { if (statusType.equals(HEALTHY_STATUS_MESSAGE_TYPE)) { // healthy check healthyReporter.report(); + } else if (statusType.equals(ALL_STATUS_MESSAGE_TYPE)) { + // all status report + agentStatusCollector.sendAgentStatusToRedis(); + } else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) { + // metric status + } else if (statusType.equals(APP_STATUS_MESSAGE_TYPE)) { + // app status report + } else { + // unknown + } @@ -47,7 +61,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { throw new RuntimeException(e); } - return true; } } diff --git a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java index 0a6f178..6873d5a 100644 --- a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java +++ b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java @@ -109,7 +109,7 @@ public class AgentStatusCollector { // agent boot up 120s then start to report its status // at the fix rate of 15s - @Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate) + /*@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)*/ public void sendAgentStatusToRedis() { try { diff --git a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java index 89539a2..00c918a 100644 --- a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java +++ b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java @@ -14,10 +14,13 @@ public class OctopusStatusMessage { // below two will be used by both server and agent public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; + public static final String ALL_STATUS_MESSAGE_TYPE = "all"; + public static final String METRIC_STATUS_MESSAGE_TYPE = "metric"; + public static final String APP_STATUS_MESSAGE_TYPE = "app"; /** * which kind of status should be return - * short => short time message + * metric => short time message * all => all agent status message * healthy => check for healthy * */ diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java index ee06b54..b5e3096 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java @@ -5,6 +5,7 @@ import lombok.Data; import java.io.Serializable; +@Deprecated @Data public class OctopusQuartzJob implements Serializable { diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java index 55bb1a8..3ffbf99 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java @@ -1,15 +1,22 @@ package io.wdd.rpc.scheduler.beans; import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; +import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; import java.io.Serializable; import java.time.LocalDateTime; @Data +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Octopus 定时任务的持久化存储信息 ") +@SuperBuilder(toBuilder = true) public class OctopusQuartzLog implements Serializable { @ApiModelProperty(value = "ID", hidden = true) diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java index fd1696b..cf2f2fd 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java @@ -22,7 +22,7 @@ public class MonitorAllAgentStatusJob extends QuartzJobBean { protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { // get the jobMetaMap - //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // actually execute the monitor service monitorAllAgentStatus.go(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java new file mode 100644 index 0000000..82536d2 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -0,0 +1,50 @@ +package io.wdd.rpc.scheduler.service; + + +import com.alibaba.nacos.api.config.annotation.NacosValue; +import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +@Component +@Slf4j +public class BuildStatusScheduleTask { + + @Resource + OctopusQuartzService octopusQuartzService; + + @Value(value = "${octopus.status.healthy.cron}") + String healthyCronTimeExpress; + + @Value(value = "${octopus.status.healthy.start-delay}") + int healthyCheckStartDelaySeconds; + + + @PostConstruct + public void buildAll(){ + + buildMonitorAllAgentStatusScheduleTask(); + + } + + public void buildMonitorAllAgentStatusScheduleTask(){ + + + // build the Job + octopusQuartzService.addJob( + MonitorAllAgentStatusJob.class, + "monitorAllAgentStatusJob", + "monitorAllAgentStatusJob", + healthyCheckStartDelaySeconds, + healthyCronTimeExpress, + null + ); + + + + } +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java index aafaa2d..441ab38 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java @@ -23,13 +23,15 @@ public interface OctopusQuartzService { /** * 增加一个任务job - * @param jobClass 任务job实现类 - * @param jobName 任务job名称(保证唯一性) - * @param jobGroupName 任务job组名 - * @param jobTime 任务时间表达式 - * @param jobData 任务参数 + * + * @param jobClass 任务job实现类 + * @param jobName 任务job名称(保证唯一性) + * @param jobGroupName 任务job组名 + * @param startTime + * @param cronJobExpression 任务时间表达式 + * @param jobData 任务参数 */ - void addJob(Class jobClass, String jobName, String jobGroupName, String jobTime, Map jobData); + void addJob(Class jobClass, String jobName, String jobGroupName, int startTime, String cronJobExpression, Map jobData); /** * 修改一个任务job diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java index e63d4a8..635ea9b 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java @@ -3,10 +3,10 @@ package io.wdd.rpc.scheduler.service; import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.scheduler.beans.OctopusQuartzJob; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.quartz.*; import org.quartz.DateBuilder.IntervalUnit; import org.quartz.impl.matchers.GroupMatcher; -import org.quartz.impl.triggers.CronTriggerImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Service; @@ -46,44 +46,30 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { /** * 增加一个job * - * @param jobClass - * 任务实现类 - * @param jobName - * 任务名称 - * @param jobGroupName - * 任务组名 - * @param jobTime - * 时间表达式 (这是每隔多少秒为一次任务) - * @param jobTimes - * 运行的次数 (<0:表示不限次数) - * @param jobData - * 参数 + * @param jobClass 任务实现类 + * @param jobName 任务名称 + * @param jobGroupName 任务组名 + * @param jobTime 时间表达式 (这是每隔多少秒为一次任务) + * @param jobTimes 运行的次数 (<0:表示不限次数) + * @param jobData 参数 */ @Override public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) { try { // 任务名称和组构成任务key - JobDetail jobDetail = JobBuilder - .newJob(jobClass) - .withIdentity(jobName, jobGroupName) - .build(); + JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); // 设置job参数 - if(jobData!= null && jobData.size()>0){ + if (jobData != null && jobData.size() > 0) { jobDetail.getJobDataMap().putAll(jobData); } // 使用simpleTrigger规则 Trigger trigger = null; if (jobTimes < 0) { - trigger = newTrigger().withIdentity(jobName, jobGroupName) - .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)) - .startNow().build(); + trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)).startNow().build(); } else { - trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder - .repeatSecondlyForever(1) - .withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) - .startNow().build(); + trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)).startNow().build(); } log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); scheduler.scheduleJob(jobDetail, trigger); @@ -96,39 +82,35 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { /** * 增加一个job * - * @param jobClass - * 任务实现类 - * @param jobName - * 任务名称(建议唯一) - * @param jobGroupName - * 任务组名 - * @param cronJobExpression - * 时间表达式 (如:0/5 * * * * ? ) - * @param jobData - * 参数 + * @param jobClass 任务实现类 + * @param jobName 任务名称(建议唯一) + * @param jobGroupName 任务组名 + * @param startTime + * @param cronJobExpression 时间表达式 (如:0/5 * * * * ? ) + * @param jobData 参数 */ @Override - public void addJob(Class jobClass, String jobName, String jobGroupName, String cronJobExpression, Map jobData) { + public void addJob(Class jobClass, String jobName, String jobGroupName, int startTime, String cronJobExpression, Map jobData) { try { // 创建jobDetail实例,绑定Job实现类 // 指明job的名称,所在组的名称,以及绑定job类 // 任务名称和组构成任务key - JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName) - .build(); + JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); // 设置job参数 - if(jobData!= null && jobData.size()>0){ + if (jobData != null && jobData.size() > 0) { jobDetail.getJobDataMap().putAll(jobData); } // 定义调度触发规则 // 使用cornTrigger规则 // 触发器key - Trigger trigger = newTrigger() - .withIdentity(jobName, jobGroupName) - .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND)) - .withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)) - .startNow() - .build(); + + // uniform the start time + if (ObjectUtils.isEmpty(startTime) || startTime == 0) { + startTime = 1; + } + + Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName).startAt(DateBuilder.futureDate(startTime, IntervalUnit.SECOND)).withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)).startNow().build(); // 把作业和触发器注册到任务调度中 scheduler.scheduleJob(jobDetail, trigger); @@ -153,8 +135,7 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); log.info("new jobTime: {}", jobTime); - trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) - .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); + trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); // 重启触发器 scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { @@ -166,10 +147,8 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { /** * 删除任务一个job * - * @param jobName - * 任务名称 - * @param jobGroupName - * 任务组名 + * @param jobName 任务名称 + * @param jobGroupName 任务组名 */ @Override public void deleteJob(String jobName, String jobGroupName) {