[ server ] - monitor all agent status - 2

This commit is contained in:
zeaslity
2023-01-11 10:52:01 +08:00
parent b7958f5c78
commit 1b17a5cf44
10 changed files with 167 additions and 60 deletions

View File

@@ -1,11 +1,27 @@
package io.wdd.agent.config.message.handler; package io.wdd.agent.config.message.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.status.HealthyReporter;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.OctopusStatusMessage;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
@Component @Component
public class OMHandlerStatus extends AbstractOctopusMessageHandler { public class OMHandlerStatus extends AbstractOctopusMessageHandler {
@Resource
ObjectMapper objectMapper;
@Resource
HealthyReporter healthyReporter;
@Override @Override
public boolean handle(OctopusMessage octopusMessage) { public boolean handle(OctopusMessage octopusMessage) {
@@ -13,6 +29,25 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
return next.handle(octopusMessage); return next.handle(octopusMessage);
} }
// handle about the status kind
try {
OctopusStatusMessage statusMessage = objectMapper.readValue((String) octopusMessage.getContent(), new TypeReference<OctopusStatusMessage>() {
});
String statusType = statusMessage.getType();
if (statusType.equals(HEALTHY_STATUS_MESSAGE_TYPE)) {
// healthy check
healthyReporter.report();
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return true; return true;
} }
} }

View File

@@ -0,0 +1,40 @@
package io.wdd.agent.status;
import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
/**
* 1. modify the redis key => ALL_AGENT_STATUS_REDIS_KEY
* 2. the hashmap struct key => agentTopicName
* 3. modify the key to "1"
*/
@Service
@Slf4j
public class HealthyReporter {
@Resource
RedisTemplate redisTemplate;
@Resource
AgentServerInfo agentServerInfo;
public void report(){
redisTemplate.opsForHash().put(
ALL_AGENT_STATUS_REDIS_KEY,
agentServerInfo.getAgentTopicName(),
"1"
);
log.debug("Agent Healthy Check Complete ! Time is => {}", TimeUtils.currentTimeString());
}
}

View File

@@ -11,6 +11,10 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
public class OctopusStatusMessage { public class OctopusStatusMessage {
// below two will be used by both server and agent
public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS";
public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping";
/** /**
* which kind of status should be return * which kind of status should be return
* short => short time message * short => short time message

View File

@@ -1,18 +0,0 @@
package io.wdd.rpc.scheduler.call;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.rpc.status.CollectAgentStatus;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class CallAgentQuartzService {
@Resource
CollectAgentStatus collectAgentStatus;
}

View File

@@ -13,7 +13,7 @@ import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@Async @Deprecated
public class ExecutionJob extends QuartzJobBean { public class ExecutionJob extends QuartzJobBean {
@Override @Override

View File

@@ -0,0 +1,22 @@
package io.wdd.rpc.scheduler.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* save the octopus quartz log to database
*/
@Service
@Slf4j
public class QuartzLogOperator {
public boolean save(){
log.info("QuartzLogOperator pretend to have saved the log !");
return true;
}
}

View File

@@ -7,6 +7,7 @@ import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@Deprecated
public class QuartzRunnable implements Callable<Object> { public class QuartzRunnable implements Callable<Object> {
private final Object target; private final Object target;

View File

@@ -0,0 +1,34 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.config.QuartzLogOperator;
import io.wdd.rpc.status.MonitorAllAgentStatus;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import java.nio.file.AccessMode;
public class MonitorAllAgentStatusJob extends QuartzJobBean {
@Resource
MonitorAllAgentStatus monitorAllAgentStatus;
@Resource
QuartzLogOperator quartzLogOperator;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// get the jobMetaMap
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service
monitorAllAgentStatus.go();
// log to somewhere
quartzLogOperator.save();
}
}

View File

@@ -38,35 +38,9 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
@Override @Override
public boolean addJob(OctopusQuartzJob quartzJob) { public boolean addJob(OctopusQuartzJob quartzJob) {
try {
// 构建jobDetail,并与PrintHelloJob类绑定(Job执行内容)
JobDetail jobDetail = JobBuilder
.newJob(PrintHelloJob.class)
.withIdentity(quartzJob.getUuid())
.build();
//通过触发器名和cron表达式创建Trigger
Trigger cronTrigger = newTrigger()
.withIdentity(quartzJob.getUuid())
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression()))
.build();
//把job信息放入jobDataMap中 job_key为标识 return false;
cronTrigger.getJobDataMap().put(OctopusQuartzJob.JOB_KEY, quartzJob);
//重置启动时间
((CronTriggerImpl)cronTrigger).setStartTime(new Date());
//执行定时任务
scheduler.scheduleJob(jobDetail, cronTrigger);
} catch (Exception e) {
log.error("【创建定时任务失败】 定时任务id{}", quartzJob.getId());
return false;
}
return true;
} }
/** /**
@@ -86,16 +60,19 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
* 参数 * 参数
*/ */
@Override @Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) {
int jobTimes, Map jobData) {
try { try {
// 任务名称和组构成任务key // 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName) JobDetail jobDetail = JobBuilder
.newJob(jobClass)
.withIdentity(jobName, jobGroupName)
.build(); .build();
// 设置job参数 // 设置job参数
if(jobData!= null && jobData.size()>0){ if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData); jobDetail.getJobDataMap().putAll(jobData);
} }
// 使用simpleTrigger规则 // 使用simpleTrigger规则
Trigger trigger = null; Trigger trigger = null;
if (jobTimes < 0) { if (jobTimes < 0) {
@@ -104,7 +81,8 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
.startNow().build(); .startNow().build();
} else { } else {
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) .repeatSecondlyForever(1)
.withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
.startNow().build(); .startNow().build();
} }
log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
@@ -124,13 +102,13 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
* 任务名称(建议唯一) * 任务名称(建议唯一)
* @param jobGroupName * @param jobGroupName
* 任务组名 * 任务组名
* @param jobTime * @param cronJobExpression
* 时间表达式 0/5 * * * * ? * 时间表达式 0/5 * * * * ?
* @param jobData * @param jobData
* 参数 * 参数
*/ */
@Override @Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) { public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String cronJobExpression, Map jobData) {
try { try {
// 创建jobDetail实例绑定Job实现类 // 创建jobDetail实例绑定Job实现类
// 指明job的名称所在组的名称以及绑定job类 // 指明job的名称所在组的名称以及绑定job类
@@ -141,15 +119,21 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
if(jobData!= null && jobData.size()>0){ if(jobData!= null && jobData.size()>0){
jobDetail.getJobDataMap().putAll(jobData); jobDetail.getJobDataMap().putAll(jobData);
} }
// 定义调度触发规则 // 定义调度触发规则
// 使用cornTrigger规则 // 使用cornTrigger规则
// 触发器key // 触发器key
Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName) Trigger trigger = newTrigger()
.withIdentity(jobName, jobGroupName)
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND)) .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build(); .withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression))
.startNow()
.build();
// 把作业和触发器注册到任务调度中 // 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger); scheduler.scheduleJob(jobDetail, trigger);
log.info("jobDataMap: {}", jobDetail.getJobDataMap()); log.info("jobDataMap: {}", jobDetail.getJobDataMap());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw new MyRuntimeException("add job error!"); throw new MyRuntimeException("add job error!");

View File

@@ -15,6 +15,9 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
/** /**
* 获取所有注册的Agent * 获取所有注册的Agent
* <p> * <p>
@@ -31,8 +34,6 @@ import java.util.stream.Collectors;
public class MonitorAllAgentStatus { public class MonitorAllAgentStatus {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
private static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS";
private static final String STATUS_MESSAGE_TYPE = "ping";
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP; private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
private List<String> ALL_AGENT_TOPICNAME_LIST; private List<String> ALL_AGENT_TOPICNAME_LIST;
@@ -97,7 +98,7 @@ public class MonitorAllAgentStatus {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST.stream().map( List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST.stream().map(
agentTopicName -> OctopusStatusMessage.builder() agentTopicName -> OctopusStatusMessage.builder()
.agentTopicName(agentTopicName) .agentTopicName(agentTopicName)
.type(STATUS_MESSAGE_TYPE) .type(HEALTHY_STATUS_MESSAGE_TYPE)
.build() .build()
).collect(Collectors.toList()); ).collect(Collectors.toList());
collectAgentStatus.collectAgentStatusList(collect); collectAgentStatus.collectAgentStatusList(collect);
@@ -108,17 +109,21 @@ public class MonitorAllAgentStatus {
ALL_AGENT_STATUS_REDIS_KEY, ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPICNAME_LIST); ALL_AGENT_TOPICNAME_LIST);
// current log to console is ok
HashMap<String, String> tmp = new HashMap<>(32); HashMap<String, String> tmp = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) {
tmp.put(ALL_AGENT_TOPICNAME_LIST.get(i), tmp.put(
ALL_AGENT_TOPICNAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i))) uniformHealthyStatus(String.valueOf(statusList.get(i)))
); );
} }
// current log to console is ok
String currentTimeString = TimeUtils.currentTimeString(); String currentTimeString = TimeUtils.currentTimeString();
log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp); log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp);
// help gc
tmp = null;
// update time // update time
AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString);
// init the healthy map // init the healthy map