diff --git a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java index 00c918a..bc4cf45 100644 --- a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java +++ b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java @@ -28,5 +28,8 @@ public class OctopusStatusMessage { String agentTopicName; + int metricRepeatCount; + + int metricRepeatPinch; } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java new file mode 100644 index 0000000..3ab9659 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java @@ -0,0 +1,34 @@ +package io.wdd.rpc.scheduler.job; + +import io.wdd.rpc.status.AgentRuntimeMetricStatus; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import javax.annotation.Resource; + +public class AgentRunMetricStatusJob extends QuartzJobBean { + + @Resource + AgentRuntimeMetricStatus agentRuntimeMetricStatus; + + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + // 从JobDetailContext中获取相应的信息 + JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + System.out.println("jobDataMap = " + jobDataMap); + + int metricRepeatCount = 10; + int metricRepeatPinch = 5; + // 执行Agent Metric 状态收集任务 + agentRuntimeMetricStatus.collect(metricRepeatCount, metricRepeatPinch); + + // todo 机构设计状态会被存储至 Redis Stream Key 中 + // AgentTopicName-Metric + + } + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java index cf2f2fd..3279ca4 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/MonitorAllAgentStatusJob.java @@ -2,13 +2,11 @@ package io.wdd.rpc.scheduler.job; import io.wdd.rpc.scheduler.config.QuartzLogOperator; import io.wdd.rpc.status.MonitorAllAgentStatus; -import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import javax.annotation.Resource; -import java.nio.file.AccessMode; public class MonitorAllAgentStatusJob extends QuartzJobBean { @@ -22,7 +20,7 @@ public class MonitorAllAgentStatusJob extends QuartzJobBean { protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { // get the jobMetaMap - JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // actually execute the monitor service monitorAllAgentStatus.go(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index 82536d2..2ebb36b 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -1,14 +1,17 @@ package io.wdd.rpc.scheduler.service; -import com.alibaba.nacos.api.config.annotation.NacosValue; +import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob; import lombok.extern.slf4j.Slf4j; +import org.quartz.CronExpression; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.text.ParseException; +import java.util.Date; @Component @Slf4j @@ -23,28 +26,80 @@ public class BuildStatusScheduleTask { @Value(value = "${octopus.status.healthy.start-delay}") int healthyCheckStartDelaySeconds; + @Value(value = "${octopus.status.metric.pinch}") + int metricReportTimePinch; + + public static final String JOB_GROUP_NAME = "OctopusAgent"; @PostConstruct - public void buildAll(){ + private void buildAll() { + // Agent存活健康状态检查 buildMonitorAllAgentStatusScheduleTask(); + // Agent运行信息检查 Metric + + // Agent全部信息检查 All + } - public void buildMonitorAllAgentStatusScheduleTask(){ + /** + * Agent运行信息检查 Metric + * 【调用】应该由 健康状态检查结果 调用 ==> 所有存活节点需要进行Metric信息汇报 + * 【间隔】存活间隔内,间隔一定的时间汇报Metric + */ + public void buildAgentMetricScheduleTask() { + // 计算 Metric检测的时间间隔 + int metricReportTimesCount = 19; + try { + CronExpression cronExpression = new CronExpression(healthyCronTimeExpress); + + Date now = new Date(); + Date nextValidTime = cronExpression.getNextValidTimeAfter(now); + long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000; + metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1; + + System.out.println("totalSeconds = " + totalSeconds); + System.out.println("metricReportTimesCount = " + metricReportTimesCount); + + } catch (ParseException e) { + throw new RuntimeException(e); + } + + // build the Job + // todo 解决创建太多对象的问题,需要缓存相应的内容 + octopusQuartzService.addJob( + AgentRunMetricStatusJob.class, + "agentRunMetricStatusJob", + JOB_GROUP_NAME, + metricReportTimePinch, + metricReportTimesCount, + null + ); + + } + + /** + * Agent存活健康状态检查 + *

+ * 定时任务,从Nacos配置中获取相应的信息 + * 延迟触发时间 healthyCheckStartDelaySeconds + * 定时任务间隔 healthyCronTimeExpress + */ + private void buildMonitorAllAgentStatusScheduleTask() { // build the Job octopusQuartzService.addJob( MonitorAllAgentStatusJob.class, "monitorAllAgentStatusJob", - "monitorAllAgentStatusJob", + JOB_GROUP_NAME, healthyCheckStartDelaySeconds, healthyCronTimeExpress, null ); - - } + + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java index 441ab38..2d0955d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java @@ -1,6 +1,7 @@ package io.wdd.rpc.scheduler.service; import io.wdd.rpc.scheduler.beans.OctopusQuartzJob; +import org.quartz.Trigger; import org.springframework.scheduling.quartz.QuartzJobBean; import java.util.List; import java.util.Map; @@ -15,11 +16,11 @@ public interface OctopusQuartzService { * @param jobClass 任务job实现类 * @param jobName 任务job名称(保证唯一性) * @param jobGroupName 任务job组名 - * @param jobTime 任务时间间隔(秒) - * @param jobTimes 任务运行次数(若<0,则不限次数) + * @param jobRunTimePinch 任务时间间隔(秒) + * @param jobRunRepeatTimes 任务运行次数(若<0,则不限次数) * @param jobData 任务参数 */ - void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData); + void addJob(Class jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData); /** * 增加一个任务job @@ -84,4 +85,7 @@ public interface OctopusQuartzService { List> queryRunJob(); + List queryAllTrigger(); + + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java index 635ea9b..4b46df0 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java @@ -13,7 +13,9 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; +import java.util.stream.Collectors; +import static io.wdd.rpc.scheduler.service.BuildStatusScheduleTask.JOB_GROUP_NAME; import static org.quartz.TriggerBuilder.newTrigger; /** @@ -49,15 +51,18 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { * @param jobClass 任务实现类 * @param jobName 任务名称 * @param jobGroupName 任务组名 - * @param jobTime 时间表达式 (这是每隔多少秒为一次任务) - * @param jobTimes 运行的次数 (<0:表示不限次数) + * @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务) + * @param jobRunRepeatTimes 运行的次数 (<0:表示不限次数) * @param jobData 参数 */ @Override - public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) { + public void addJob(Class jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData) { try { // 任务名称和组构成任务key - JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); + JobDetail jobDetail = JobBuilder + .newJob(jobClass) + .withIdentity(jobName, jobGroupName) + .build(); // 设置job参数 if (jobData != null && jobData.size() > 0) { @@ -65,14 +70,22 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { } // 使用simpleTrigger规则 - Trigger trigger = null; - if (jobTimes < 0) { - trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)).startNow().build(); - } else { - trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)).startNow().build(); - } - log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); + Trigger trigger = newTrigger() + .withIdentity(jobName, jobGroupName) + .withSchedule( + SimpleScheduleBuilder + .repeatSecondlyForTotalCount( + jobRunRepeatTimes, + jobRunTimePinch + ) + ) + .startNow() + .build(); + + log.debug("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap()); + scheduler.scheduleJob(jobDetail, trigger); + } catch (SchedulerException e) { e.printStackTrace(); throw new MyRuntimeException("add job error!"); @@ -110,7 +123,16 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { startTime = 1; } - Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName).startAt(DateBuilder.futureDate(startTime, IntervalUnit.SECOND)).withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)).startNow().build(); + Trigger trigger = newTrigger() + .withIdentity(jobName, jobGroupName) + .startAt( + DateBuilder.futureDate(startTime, IntervalUnit.SECOND) + ) + .withSchedule( + CronScheduleBuilder.cronSchedule(cronJobExpression) + ) + .startNow() + .build(); // 把作业和触发器注册到任务调度中 scheduler.scheduleJob(jobDetail, trigger); @@ -283,4 +305,26 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { return jobList; } + @Override + public List queryAllTrigger() { + + try { + + return scheduler.getTriggerKeys( + GroupMatcher.groupEquals(JOB_GROUP_NAME) + ).stream().map( + triggerKey -> { + try { + return scheduler.getTrigger(triggerKey); + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + } + ).collect(Collectors.toList()); + + } catch (SchedulerException e) { + throw new RuntimeException(e); + } + } + } diff --git a/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java new file mode 100644 index 0000000..e1b09b6 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java @@ -0,0 +1,61 @@ +package io.wdd.rpc.status; + + +import io.wdd.common.beans.status.OctopusStatusMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + +import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; + +/** + * 收集OctopusAgent的运行Metric信息 + *

+ * CPU Memory AppStatus易变信息 + */ +@Service +@Slf4j +public class AgentRuntimeMetricStatus { + + public static List ALL_HEALTHY_AGENT_TOPIC_NAMES; + + @Resource + CollectAgentStatus collectAgentStatus; + + public void collect(int metricRepeatCount, int metricRepeatPinch) { + + // 检查基础信息 + if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAMES)) { + log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES"); + } + // 构建 OctopusMessage + // 只发送一次消息,让Agent循环定时执行任务 + buildMetricStatusMessageAndSend(metricRepeatCount, metricRepeatPinch); + + // + } + + private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) { + + List collect = ALL_HEALTHY_AGENT_TOPIC_NAMES.stream() + .map( + agentTopicName -> { + return OctopusStatusMessage.builder() + .type(METRIC_STATUS_MESSAGE_TYPE) + .metricRepeatCount(metricRepeatCount) + .metricRepeatCount(metricRepeatCount) + .agentTopicName(agentTopicName) + .build(); + } + ).collect(Collectors.toList()); + + // send to the next level + collectAgentStatus.statusMessageToAgent(collect); + + } + +} diff --git a/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java index 565b9be..50b6321 100644 --- a/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java @@ -1,5 +1,7 @@ package io.wdd.rpc.status; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.status.OctopusStatusMessage; @@ -22,14 +24,17 @@ public class CollectAgentStatus { @Resource ToAgentMessageSender toAgentMessageSender; + @Resource + ObjectMapper objectMapper; + public void collectAgentStatus(OctopusStatusMessage statusMessage) { - this.collectAgentStatusList(List.of(statusMessage)); + this.statusMessageToAgent(List.of(statusMessage)); } - public void collectAgentStatusList(List statusMessageList) { + public void statusMessageToAgent(List statusMessageList) { // build all the OctopusMessage List octopusMessageList = statusMessageList.stream().map( @@ -43,15 +48,23 @@ public class CollectAgentStatus { toAgentMessageSender.send(octopusMessageList); // todo how to get result ? - } private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) { + + // must be like this or it will be deserialized as LinkedHashMap + String s; + try { + s = objectMapper.writeValueAsString(octopusStatusMessage); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return OctopusMessage.builder() .uuid(octopusStatusMessage.getAgentTopicName()) .type(OctopusMessageType.STATUS) .init_time(TimeUtils.currentTime()) - .content(octopusStatusMessage) + .content(s) .build(); } diff --git a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java index b209638..e4d3ba5 100644 --- a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java @@ -1,22 +1,29 @@ package io.wdd.rpc.status; +import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.status.OctopusStatusMessage; import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; +import io.wdd.rpc.scheduler.service.OctopusQuartzService; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; +import org.quartz.Trigger; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; +import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES; /** * 获取所有注册的Agent @@ -44,6 +51,8 @@ public class MonitorAllAgentStatus { CollectAgentStatus collectAgentStatus; @Resource CoreServerService coreServerService; + @Resource + BuildStatusScheduleTask buildStatusScheduleTask; public void go() { @@ -75,7 +84,8 @@ public class MonitorAllAgentStatus { private void checkOrCreateRedisHealthyKey() { - if (!redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) { + // must init the cached map && make sure the redis key existed! + if (null == AGENT_HEALTHY_INIT_MAP || !redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) { log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create"); // build the redis all agent healthy map struct @@ -95,13 +105,15 @@ public class MonitorAllAgentStatus { } private void buildAndSendAgentHealthMessage() { + List collect = ALL_AGENT_TOPICNAME_LIST.stream().map( agentTopicName -> OctopusStatusMessage.builder() .agentTopicName(agentTopicName) .type(HEALTHY_STATUS_MESSAGE_TYPE) .build() ).collect(Collectors.toList()); - collectAgentStatus.collectAgentStatusList(collect); + + collectAgentStatus.statusMessageToAgent(collect); } private void updateAllAgentHealthyStatus() { @@ -124,6 +136,17 @@ public class MonitorAllAgentStatus { // help gc tmp = null; + // Trigger调用Agent Metric 任务 + ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); + for (int i = 0; i < statusList.size(); i++) { + if (statusList.get(i).equals("1")) { + allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i)); + } + } + ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames; + // 执行Metric上报任务 + buildStatusScheduleTask.buildAgentMetricScheduleTask(); + // update time AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); // init the healthy map