[ server ] - monitor all agent status - 3
This commit is contained in:
@@ -3,6 +3,7 @@ package io.wdd.agent.config.message.handler;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import io.wdd.agent.status.AgentStatusCollector;
|
||||||
import io.wdd.agent.status.HealthyReporter;
|
import io.wdd.agent.status.HealthyReporter;
|
||||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||||
@@ -11,7 +12,7 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
|
import static io.wdd.common.beans.status.OctopusStatusMessage.*;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
||||||
@@ -22,6 +23,9 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
|||||||
@Resource
|
@Resource
|
||||||
HealthyReporter healthyReporter;
|
HealthyReporter healthyReporter;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
AgentStatusCollector agentStatusCollector;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean handle(OctopusMessage octopusMessage) {
|
public boolean handle(OctopusMessage octopusMessage) {
|
||||||
|
|
||||||
@@ -40,6 +44,16 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
|||||||
if (statusType.equals(HEALTHY_STATUS_MESSAGE_TYPE)) {
|
if (statusType.equals(HEALTHY_STATUS_MESSAGE_TYPE)) {
|
||||||
// healthy check
|
// healthy check
|
||||||
healthyReporter.report();
|
healthyReporter.report();
|
||||||
|
} else if (statusType.equals(ALL_STATUS_MESSAGE_TYPE)) {
|
||||||
|
// all status report
|
||||||
|
agentStatusCollector.sendAgentStatusToRedis();
|
||||||
|
} else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) {
|
||||||
|
// metric status
|
||||||
|
} else if (statusType.equals(APP_STATUS_MESSAGE_TYPE)) {
|
||||||
|
// app status report
|
||||||
|
} else {
|
||||||
|
// unknown
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -47,7 +61,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
|||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ public class AgentStatusCollector {
|
|||||||
|
|
||||||
// agent boot up 120s then start to report its status
|
// agent boot up 120s then start to report its status
|
||||||
// at the fix rate of 15s
|
// at the fix rate of 15s
|
||||||
@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)
|
/*@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)*/
|
||||||
public void sendAgentStatusToRedis() {
|
public void sendAgentStatusToRedis() {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -14,10 +14,13 @@ public class OctopusStatusMessage {
|
|||||||
// below two will be used by both server and agent
|
// below two will be used by both server and agent
|
||||||
public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS";
|
public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS";
|
||||||
public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping";
|
public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping";
|
||||||
|
public static final String ALL_STATUS_MESSAGE_TYPE = "all";
|
||||||
|
public static final String METRIC_STATUS_MESSAGE_TYPE = "metric";
|
||||||
|
public static final String APP_STATUS_MESSAGE_TYPE = "app";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* which kind of status should be return
|
* which kind of status should be return
|
||||||
* short => short time message
|
* metric => short time message
|
||||||
* all => all agent status message
|
* all => all agent status message
|
||||||
* healthy => check for healthy
|
* healthy => check for healthy
|
||||||
* */
|
* */
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import lombok.Data;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
@Data
|
@Data
|
||||||
public class OctopusQuartzJob implements Serializable {
|
public class OctopusQuartzJob implements Serializable {
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,22 @@
|
|||||||
package io.wdd.rpc.scheduler.beans;
|
package io.wdd.rpc.scheduler.beans;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.IdType;
|
import com.baomidou.mybatisplus.annotation.IdType;
|
||||||
import com.baomidou.mybatisplus.annotation.TableField;
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
|
import io.swagger.annotations.ApiModel;
|
||||||
import io.swagger.annotations.ApiModelProperty;
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@ApiModel("Octopus 定时任务的持久化存储信息 ")
|
||||||
|
@SuperBuilder(toBuilder = true)
|
||||||
public class OctopusQuartzLog implements Serializable {
|
public class OctopusQuartzLog implements Serializable {
|
||||||
|
|
||||||
@ApiModelProperty(value = "ID", hidden = true)
|
@ApiModelProperty(value = "ID", hidden = true)
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ public class MonitorAllAgentStatusJob extends QuartzJobBean {
|
|||||||
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
||||||
|
|
||||||
// get the jobMetaMap
|
// get the jobMetaMap
|
||||||
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||||
|
|
||||||
// actually execute the monitor service
|
// actually execute the monitor service
|
||||||
monitorAllAgentStatus.go();
|
monitorAllAgentStatus.go();
|
||||||
|
|||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package io.wdd.rpc.scheduler.service;
|
||||||
|
|
||||||
|
|
||||||
|
import com.alibaba.nacos.api.config.annotation.NacosValue;
|
||||||
|
import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class BuildStatusScheduleTask {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
OctopusQuartzService octopusQuartzService;
|
||||||
|
|
||||||
|
@Value(value = "${octopus.status.healthy.cron}")
|
||||||
|
String healthyCronTimeExpress;
|
||||||
|
|
||||||
|
@Value(value = "${octopus.status.healthy.start-delay}")
|
||||||
|
int healthyCheckStartDelaySeconds;
|
||||||
|
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void buildAll(){
|
||||||
|
|
||||||
|
buildMonitorAllAgentStatusScheduleTask();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void buildMonitorAllAgentStatusScheduleTask(){
|
||||||
|
|
||||||
|
|
||||||
|
// build the Job
|
||||||
|
octopusQuartzService.addJob(
|
||||||
|
MonitorAllAgentStatusJob.class,
|
||||||
|
"monitorAllAgentStatusJob",
|
||||||
|
"monitorAllAgentStatusJob",
|
||||||
|
healthyCheckStartDelaySeconds,
|
||||||
|
healthyCronTimeExpress,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -23,13 +23,15 @@ public interface OctopusQuartzService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 增加一个任务job
|
* 增加一个任务job
|
||||||
|
*
|
||||||
* @param jobClass 任务job实现类
|
* @param jobClass 任务job实现类
|
||||||
* @param jobName 任务job名称(保证唯一性)
|
* @param jobName 任务job名称(保证唯一性)
|
||||||
* @param jobGroupName 任务job组名
|
* @param jobGroupName 任务job组名
|
||||||
* @param jobTime 任务时间表达式
|
* @param startTime
|
||||||
|
* @param cronJobExpression 任务时间表达式
|
||||||
* @param jobData 任务参数
|
* @param jobData 任务参数
|
||||||
*/
|
*/
|
||||||
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData);
|
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int startTime, String cronJobExpression, Map jobData);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 修改一个任务job
|
* 修改一个任务job
|
||||||
|
|||||||
@@ -3,10 +3,10 @@ package io.wdd.rpc.scheduler.service;
|
|||||||
import io.wdd.common.handler.MyRuntimeException;
|
import io.wdd.common.handler.MyRuntimeException;
|
||||||
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
|
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.quartz.*;
|
import org.quartz.*;
|
||||||
import org.quartz.DateBuilder.IntervalUnit;
|
import org.quartz.DateBuilder.IntervalUnit;
|
||||||
import org.quartz.impl.matchers.GroupMatcher;
|
import org.quartz.impl.matchers.GroupMatcher;
|
||||||
import org.quartz.impl.triggers.CronTriggerImpl;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.quartz.QuartzJobBean;
|
import org.springframework.scheduling.quartz.QuartzJobBean;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@@ -46,44 +46,30 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
|
|||||||
/**
|
/**
|
||||||
* 增加一个job
|
* 增加一个job
|
||||||
*
|
*
|
||||||
* @param jobClass
|
* @param jobClass 任务实现类
|
||||||
* 任务实现类
|
* @param jobName 任务名称
|
||||||
* @param jobName
|
* @param jobGroupName 任务组名
|
||||||
* 任务名称
|
* @param jobTime 时间表达式 (这是每隔多少秒为一次任务)
|
||||||
* @param jobGroupName
|
* @param jobTimes 运行的次数 (<0:表示不限次数)
|
||||||
* 任务组名
|
* @param jobData 参数
|
||||||
* @param jobTime
|
|
||||||
* 时间表达式 (这是每隔多少秒为一次任务)
|
|
||||||
* @param jobTimes
|
|
||||||
* 运行的次数 (<0:表示不限次数)
|
|
||||||
* @param jobData
|
|
||||||
* 参数
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) {
|
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) {
|
||||||
try {
|
try {
|
||||||
// 任务名称和组构成任务key
|
// 任务名称和组构成任务key
|
||||||
JobDetail jobDetail = JobBuilder
|
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
|
||||||
.newJob(jobClass)
|
|
||||||
.withIdentity(jobName, jobGroupName)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
// 设置job参数
|
// 设置job参数
|
||||||
if(jobData!= null && jobData.size()>0){
|
if (jobData != null && jobData.size() > 0) {
|
||||||
jobDetail.getJobDataMap().putAll(jobData);
|
jobDetail.getJobDataMap().putAll(jobData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 使用simpleTrigger规则
|
// 使用simpleTrigger规则
|
||||||
Trigger trigger = null;
|
Trigger trigger = null;
|
||||||
if (jobTimes < 0) {
|
if (jobTimes < 0) {
|
||||||
trigger = newTrigger().withIdentity(jobName, jobGroupName)
|
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)).startNow().build();
|
||||||
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
|
|
||||||
.startNow().build();
|
|
||||||
} else {
|
} else {
|
||||||
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
|
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)).startNow().build();
|
||||||
.repeatSecondlyForever(1)
|
|
||||||
.withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
|
|
||||||
.startNow().build();
|
|
||||||
}
|
}
|
||||||
log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
|
log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
|
||||||
scheduler.scheduleJob(jobDetail, trigger);
|
scheduler.scheduleJob(jobDetail, trigger);
|
||||||
@@ -96,39 +82,35 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
|
|||||||
/**
|
/**
|
||||||
* 增加一个job
|
* 增加一个job
|
||||||
*
|
*
|
||||||
* @param jobClass
|
* @param jobClass 任务实现类
|
||||||
* 任务实现类
|
* @param jobName 任务名称(建议唯一)
|
||||||
* @param jobName
|
* @param jobGroupName 任务组名
|
||||||
* 任务名称(建议唯一)
|
* @param startTime
|
||||||
* @param jobGroupName
|
* @param cronJobExpression 时间表达式 (如:0/5 * * * * ? )
|
||||||
* 任务组名
|
* @param jobData 参数
|
||||||
* @param cronJobExpression
|
|
||||||
* 时间表达式 (如:0/5 * * * * ? )
|
|
||||||
* @param jobData
|
|
||||||
* 参数
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String cronJobExpression, Map jobData) {
|
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int startTime, String cronJobExpression, Map jobData) {
|
||||||
try {
|
try {
|
||||||
// 创建jobDetail实例,绑定Job实现类
|
// 创建jobDetail实例,绑定Job实现类
|
||||||
// 指明job的名称,所在组的名称,以及绑定job类
|
// 指明job的名称,所在组的名称,以及绑定job类
|
||||||
// 任务名称和组构成任务key
|
// 任务名称和组构成任务key
|
||||||
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
|
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
|
||||||
.build();
|
|
||||||
// 设置job参数
|
// 设置job参数
|
||||||
if(jobData!= null && jobData.size()>0){
|
if (jobData != null && jobData.size() > 0) {
|
||||||
jobDetail.getJobDataMap().putAll(jobData);
|
jobDetail.getJobDataMap().putAll(jobData);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定义调度触发规则
|
// 定义调度触发规则
|
||||||
// 使用cornTrigger规则
|
// 使用cornTrigger规则
|
||||||
// 触发器key
|
// 触发器key
|
||||||
Trigger trigger = newTrigger()
|
|
||||||
.withIdentity(jobName, jobGroupName)
|
// uniform the start time
|
||||||
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
|
if (ObjectUtils.isEmpty(startTime) || startTime == 0) {
|
||||||
.withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression))
|
startTime = 1;
|
||||||
.startNow()
|
}
|
||||||
.build();
|
|
||||||
|
Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName).startAt(DateBuilder.futureDate(startTime, IntervalUnit.SECOND)).withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)).startNow().build();
|
||||||
|
|
||||||
// 把作业和触发器注册到任务调度中
|
// 把作业和触发器注册到任务调度中
|
||||||
scheduler.scheduleJob(jobDetail, trigger);
|
scheduler.scheduleJob(jobDetail, trigger);
|
||||||
@@ -153,8 +135,7 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
|
|||||||
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
|
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
|
||||||
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
|
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
|
||||||
log.info("new jobTime: {}", jobTime);
|
log.info("new jobTime: {}", jobTime);
|
||||||
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
|
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
|
||||||
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
|
|
||||||
// 重启触发器
|
// 重启触发器
|
||||||
scheduler.rescheduleJob(triggerKey, trigger);
|
scheduler.rescheduleJob(triggerKey, trigger);
|
||||||
} catch (SchedulerException e) {
|
} catch (SchedulerException e) {
|
||||||
@@ -166,10 +147,8 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
|
|||||||
/**
|
/**
|
||||||
* 删除任务一个job
|
* 删除任务一个job
|
||||||
*
|
*
|
||||||
* @param jobName
|
* @param jobName 任务名称
|
||||||
* 任务名称
|
* @param jobGroupName 任务组名
|
||||||
* @param jobGroupName
|
|
||||||
* 任务组名
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void deleteJob(String jobName, String jobGroupName) {
|
public void deleteJob(String jobName, String jobGroupName) {
|
||||||
|
|||||||
Reference in New Issue
Block a user