diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java index ef92986..31ea0b5 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java @@ -1,11 +1,27 @@ package io.wdd.agent.config.message.handler; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.agent.status.HealthyReporter; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.beans.status.OctopusStatusMessage; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + +import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; + @Component public class OMHandlerStatus extends AbstractOctopusMessageHandler { + + @Resource + ObjectMapper objectMapper; + + @Resource + HealthyReporter healthyReporter; + @Override public boolean handle(OctopusMessage octopusMessage) { @@ -13,6 +29,25 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { return next.handle(octopusMessage); } + // handle about the status kind + try { + + OctopusStatusMessage statusMessage = objectMapper.readValue((String) octopusMessage.getContent(), new TypeReference() { + }); + + String statusType = statusMessage.getType(); + + if (statusType.equals(HEALTHY_STATUS_MESSAGE_TYPE)) { + // healthy check + healthyReporter.report(); + } + + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return true; } } diff --git a/agent/src/main/java/io/wdd/agent/status/HealthyReporter.java b/agent/src/main/java/io/wdd/agent/status/HealthyReporter.java new file mode 100644 index 0000000..2da2ffd --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/status/HealthyReporter.java @@ -0,0 +1,40 @@ +package io.wdd.agent.status; + +import io.wdd.agent.config.beans.init.AgentServerInfo; +import io.wdd.common.utils.TimeUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; + +/** + * 1. modify the redis key => ALL_AGENT_STATUS_REDIS_KEY + * 2. the hashmap struct key => agentTopicName + * 3. modify the key to "1" + */ +@Service +@Slf4j +public class HealthyReporter { + + @Resource + RedisTemplate redisTemplate; + + @Resource + AgentServerInfo agentServerInfo; + + public void report(){ + + redisTemplate.opsForHash().put( + ALL_AGENT_STATUS_REDIS_KEY, + agentServerInfo.getAgentTopicName(), + "1" + ); + + log.debug("Agent Healthy Check Complete ! Time is => {}", TimeUtils.currentTimeString()); + } + + +} diff --git a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java index cbf730c..89539a2 100644 --- a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java +++ b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java @@ -11,6 +11,10 @@ import lombok.experimental.SuperBuilder; @SuperBuilder(toBuilder = true) public class OctopusStatusMessage { + // below two will be used by both server and agent + public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; + public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; + /** * which kind of status should be return * short => short time message diff --git a/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java deleted file mode 100644 index b93b84c..0000000 --- a/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.wdd.rpc.scheduler.call; - -import io.wdd.common.beans.status.OctopusStatusMessage; -import io.wdd.rpc.status.CollectAgentStatus; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; - -@Service -public class CallAgentQuartzService { - - @Resource - CollectAgentStatus collectAgentStatus; - - - - -} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java b/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java index 635f127..e273ab4 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java @@ -13,7 +13,7 @@ import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; -@Async +@Deprecated public class ExecutionJob extends QuartzJobBean { @Override diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzLogOperator.java b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzLogOperator.java new file mode 100644 index 0000000..95fdbdf --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzLogOperator.java @@ -0,0 +1,22 @@ +package io.wdd.rpc.scheduler.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * save the octopus quartz log to database + */ +@Service +@Slf4j +public class QuartzLogOperator { + + public boolean save(){ + + log.info("QuartzLogOperator pretend to have saved the log !"); + + return true; + + } + + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java index 4ec96f4..afa0082 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java @@ -7,6 +7,7 @@ import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; import java.util.concurrent.Callable; +@Deprecated public class QuartzRunnable implements Callable { private final Object target; diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java new file mode 100644 index 0000000..fd1696b --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java @@ -0,0 +1,34 @@ +package io.wdd.rpc.scheduler.job; + +import io.wdd.rpc.scheduler.config.QuartzLogOperator; +import io.wdd.rpc.status.MonitorAllAgentStatus; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import javax.annotation.Resource; +import java.nio.file.AccessMode; + +public class MonitorAllAgentStatusJob extends QuartzJobBean { + + @Resource + MonitorAllAgentStatus monitorAllAgentStatus; + + @Resource + QuartzLogOperator quartzLogOperator; + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + // get the jobMetaMap + //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + + // actually execute the monitor service + monitorAllAgentStatus.go(); + + // log to somewhere + quartzLogOperator.save(); + + } +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java index 3611034..e63d4a8 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java @@ -38,35 +38,9 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { @Override public boolean addJob(OctopusQuartzJob quartzJob) { - try { - // 构建jobDetail,并与PrintHelloJob类绑定(Job执行内容) - JobDetail jobDetail = JobBuilder - .newJob(PrintHelloJob.class) - .withIdentity(quartzJob.getUuid()) - .build(); - //通过触发器名和cron表达式创建Trigger - Trigger cronTrigger = newTrigger() - .withIdentity(quartzJob.getUuid()) - .startNow() - .withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression())) - .build(); - //把job信息放入jobDataMap中 job_key为标识 - cronTrigger.getJobDataMap().put(OctopusQuartzJob.JOB_KEY, quartzJob); - - //重置启动时间 - ((CronTriggerImpl)cronTrigger).setStartTime(new Date()); - - //执行定时任务 - scheduler.scheduleJob(jobDetail, cronTrigger); - - } catch (Exception e) { - log.error("【创建定时任务失败】 定时任务id:{}", quartzJob.getId()); - return false; - } - - return true; + return false; } /** @@ -86,16 +60,19 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { * 参数 */ @Override - public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, - int jobTimes, Map jobData) { + public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) { try { // 任务名称和组构成任务key - JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName) + JobDetail jobDetail = JobBuilder + .newJob(jobClass) + .withIdentity(jobName, jobGroupName) .build(); + // 设置job参数 if(jobData!= null && jobData.size()>0){ jobDetail.getJobDataMap().putAll(jobData); } + // 使用simpleTrigger规则 Trigger trigger = null; if (jobTimes < 0) { @@ -104,7 +81,8 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { .startNow().build(); } else { trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder - .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) + .repeatSecondlyForever(1) + .withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) .startNow().build(); } log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); @@ -124,13 +102,13 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { * 任务名称(建议唯一) * @param jobGroupName * 任务组名 - * @param jobTime + * @param cronJobExpression * 时间表达式 (如:0/5 * * * * ? ) * @param jobData * 参数 */ @Override - public void addJob(Class jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) { + public void addJob(Class jobClass, String jobName, String jobGroupName, String cronJobExpression, Map jobData) { try { // 创建jobDetail实例,绑定Job实现类 // 指明job的名称,所在组的名称,以及绑定job类 @@ -141,15 +119,21 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { if(jobData!= null && jobData.size()>0){ jobDetail.getJobDataMap().putAll(jobData); } + // 定义调度触发规则 // 使用cornTrigger规则 // 触发器key - Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName) + Trigger trigger = newTrigger() + .withIdentity(jobName, jobGroupName) .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND)) - .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build(); + .withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)) + .startNow() + .build(); + // 把作业和触发器注册到任务调度中 scheduler.scheduleJob(jobDetail, trigger); log.info("jobDataMap: {}", jobDetail.getJobDataMap()); + } catch (Exception e) { e.printStackTrace(); throw new MyRuntimeException("add job error!"); diff --git a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java index efa54db..b209638 100644 --- a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java @@ -15,6 +15,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; +import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; + /** * 获取所有注册的Agent *

@@ -31,8 +34,6 @@ import java.util.stream.Collectors; public class MonitorAllAgentStatus { private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; - private static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; - private static final String STATUS_MESSAGE_TYPE = "ping"; private HashMap AGENT_HEALTHY_INIT_MAP; private List ALL_AGENT_TOPICNAME_LIST; @@ -97,7 +98,7 @@ public class MonitorAllAgentStatus { List collect = ALL_AGENT_TOPICNAME_LIST.stream().map( agentTopicName -> OctopusStatusMessage.builder() .agentTopicName(agentTopicName) - .type(STATUS_MESSAGE_TYPE) + .type(HEALTHY_STATUS_MESSAGE_TYPE) .build() ).collect(Collectors.toList()); collectAgentStatus.collectAgentStatusList(collect); @@ -108,17 +109,21 @@ public class MonitorAllAgentStatus { ALL_AGENT_STATUS_REDIS_KEY, ALL_AGENT_TOPICNAME_LIST); + + // current log to console is ok HashMap tmp = new HashMap<>(32); for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { - tmp.put(ALL_AGENT_TOPICNAME_LIST.get(i), + tmp.put( + ALL_AGENT_TOPICNAME_LIST.get(i), uniformHealthyStatus(String.valueOf(statusList.get(i))) ); } - // current log to console is ok - String currentTimeString = TimeUtils.currentTimeString(); log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp); + // help gc + tmp = null; + // update time AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); // init the healthy map