[ Status ] 完成Agent Metric部分

This commit is contained in:
zeaslity
2023-07-10 16:24:36 +08:00
parent f5a3db2f56
commit 2f477fd1cc
43 changed files with 491 additions and 381 deletions

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.scheduler.job;
package io.wdd.rpc.scheduler.dto;
import org.quartz.JobExecutionContext;

View File

@@ -23,7 +23,7 @@ public class AgentAliveStatusMonitorJob extends QuartzJobBean {
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service
agentAliveStatusMonitorService.go();
agentAliveStatusMonitorService.collectAllAgentAliveStatus();
// log to somewhere
quartzLogOperator.save();

View File

@@ -0,0 +1,29 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
public class AgentMetricStatusJob extends QuartzJobBean {
@Resource
AgentMetricStatusCollectService agentMetricStatusCollectService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// 从JobDetailContext中获取相应的信息
// JobDataMap jobDataMap = jobExecutionContext
// .getJobDetail()
// .getJobDataMap();
// 执行Agent Metric 状态收集任务
agentMetricStatusCollectService.collectHealthyAgentMetric();
}
}

View File

@@ -1,38 +0,0 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH;
public class AgentRunMetricStatusJob extends QuartzJobBean {
@Resource
AgentMetricStatusCollectService agentMetricStatusCollectService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// 从JobDetailContext中获取相应的信息
JobDataMap jobDataMap = jobExecutionContext
.getJobDetail()
.getJobDataMap();
// 执行Agent Metric 状态收集任务
agentMetricStatusCollectService.collect(
(Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT),
(Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH)
);
// todo 机构设计状态会被存储至 Redis Stream Key 中
// AgentTopicName-Metric
}
}

View File

@@ -18,6 +18,7 @@ import static io.wdd.rpc.scheduler.service.QuartzSchedulerServiceImpl.SCRIPT_SCH
* 定时脚本任务核心类Quartz框架定时调用该类
*/
@Slf4j
@Deprecated
public class AgentScriptSchedulerJob extends QuartzJobBean {
@Resource

View File

@@ -2,52 +2,39 @@ package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob;
import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob;
import io.wdd.rpc.scheduler.job.AgentMetricStatusJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH;
@Component
@Slf4j
public class BuildStatusScheduleTask {
public static final String JOB_GROUP_NAME = "OctopusAgent";
@Resource
QuartzSchedulerService octopusQuartzService;
@Value(value = "${octopus.status.healthy.cron}")
String healthyCronTimeExpress;
// todo 此时间可以更新Nacos配置更新 自动进行任务更新
@Value(value = "${octopus.status.healthy.start-delay}")
int healthyCheckStartDelaySeconds;
// @Value(value = "${octopus.status.metric.cron}")
// int metricReportCronExpress;
@Value(value = "${octopus.status.metric.pinch}")
int metricReportTimePinch;
public static final String JOB_GROUP_NAME = "OctopusAgent";
@Value(value = "${octopus.status.metric.cron}")
int metricReportCronExpress;
@Value(value = "${octopus.status.metric.start-delay}")
int metricReportStartDelaySeconds;
@PostConstruct
private void buildAll() {
private void buildAllPreScheduledTask() {
// Agent存活健康状态检查
buildMonitorAllAgentAliveStatusScheduleTask();
// Agent运行信息检查 Metric
// Agent全部信息检查 All
buildAgentMetricScheduleTask();
}
@@ -55,11 +42,23 @@ public class BuildStatusScheduleTask {
* Agent运行信息检查 Metric
* 【调用】应该由 健康状态检查结果 调用 ==> 所有存活节点需要进行Metric信息汇报
* 【间隔】存活间隔内间隔一定的时间汇报Metric
* <p>
* 2023年7月10日 更改为按照cron表达式进行执行
*/
public void buildAgentMetricScheduleTask() {
// 2023年7月10日 更改为按照cron表达式进行执行
octopusQuartzService.addMission(
AgentMetricStatusJob.class,
"agentRunMetricStatusJob",
JOB_GROUP_NAME,
metricReportStartDelaySeconds,
metricReportCronExpress,
null
);
// 计算 Metric检测的时间间隔
int metricReportTimesCount = 19;
/*int metricReportTimesCount = 19;
try {
CronExpression cronExpression = new CronExpression(healthyCronTimeExpress);
@@ -68,8 +67,8 @@ public class BuildStatusScheduleTask {
long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000;
metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1;
/*System.out.println("totalSeconds = " + totalSeconds);
System.out.println("metricReportTimesCount = " + metricReportTimesCount);*/
*//*System.out.println("totalSeconds = " + totalSeconds);
System.out.println("metricReportTimesCount = " + metricReportTimesCount);*//*
} catch (ParseException e) {
throw new RuntimeException(e);
@@ -77,18 +76,10 @@ public class BuildStatusScheduleTask {
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put(METRIC_REPORT_TIME_PINCH, metricReportTimePinch);
map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount);
map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount);*/
//
// build the Job 只发送一次消息然后让Agent获取消息 (重复间隔,重复次数) 进行相应的处理!
// todo 解决创建太多对象的问题,需要缓存相应的内容
octopusQuartzService.addMission(
AgentRunMetricStatusJob.class,
"agentRunMetricStatusJob",
JOB_GROUP_NAME,
metricReportTimePinch,
1,
map
);
}

View File

@@ -17,6 +17,7 @@ import java.util.List;
*/
@Service
@Slf4j
@Deprecated
public class AgentApplyScheduledScript {
@Resource

View File

@@ -1,9 +1,8 @@
package io.wdd.rpc.scheduler.service.status;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.init.AgentStatusCacheService;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.status.service.AsyncStatusService;
import io.wdd.rpc.status.CommonAndStatusCache;
import io.wdd.rpc.status.service.SyncStatusService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.annotation.Lazy;
@@ -14,47 +13,36 @@ import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.CommonAndStatusCache.*;
import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
/**
* 更新频率被类 BuildStatusScheduleTask.class控制
* <p>
* <p>
* 获取所有注册的Agent
* <p>
* 发送状态检查信息, agent需要update相应的HashMap的值
* redis --> all-agent-health-map agent-topic-name : 1
* todo 分布式问题弱网环境多线程操作同一个hashMap会不会出现冲突
* <p>
* 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报
* <p>
* 检查相应的 状态HashMap然后全部置为零
* 定时任务 检测所有的Agent的 存活状态 的实际执行类
*/
@Service
@Slf4j
@Lazy
public class AgentAliveStatusMonitorService {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource
RedisTemplate redisTemplate;
@Resource
AgentStatusCacheService agentStatusCacheService;
CommonAndStatusCache commonAndStatusCache;
@Resource
BuildStatusScheduleTask buildStatusScheduleTask;
@Resource
AsyncStatusService asyncStatusService;
SyncStatusService syncStatusService;
private HashMap<String, Boolean> AGENT_HEALTHY_INIT_MAP;
public void go() {
/**
* 收集所有Agent的存活状态
* 实际的定时任务的执行类,
*/
public void collectAllAgentAliveStatus() {
// 1. 获取所有注册的Agent 手动更新
agentStatusCacheService.updateAllAgentTopicNameCache();
commonAndStatusCache.updateAllAgentTopicNameCache();
if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) {
log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !");
return;
@@ -63,17 +51,16 @@ public class AgentAliveStatusMonitorService {
// 1.1 检查 Agent状态保存数据结构是否正常
checkOrCreateRedisHealthyKey();
// 2.发送状态检查信息, agent需要update相应的HashMap的值
// 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改
// 使用同步更新的策略
Map<String, Boolean> agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus(
// 同步的方法, 超时等待所有主机的存活状态
Map<String, Boolean> agentAliveStatusMap = syncStatusService.SyncCollectAgentAliveStatus(
ALL_AGENT_TOPIC_NAME_LIST,
5
);
// 更新Agent的状态
updateAllAgentHealthyStatus(agentAliveStatusMap);
// 更新Agent的状态 一级和二级缓存 同时更新 write-through的方式
updateAllAgentStatusCache(agentAliveStatusMap);
}
/**
@@ -112,21 +99,18 @@ public class AgentAliveStatusMonitorService {
.opsForHash()
.put(
ALL_AGENT_STATUS_REDIS_KEY,
"initTime",
STATUS_INIT_TIME_KEY,
TimeUtils.currentTimeString()
);
}
public void updateAllAgentHealthyStatus(Map<String, Boolean> agentAliveStatusMap) {
public void updateAllAgentStatusCache(Map<String, Boolean> agentAliveStatusMap) {
String currentTimeString = TimeUtils.currentTimeString();
// 更新所有的缓存状态
agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap);
// 执行Metric上报定时任务
// buildStatusScheduleTask.buildAgentMetricScheduleTask();
// 更新 二级缓存
commonAndStatusCache.updateAgentStatusCache(agentAliveStatusMap);
log.debug(
"[存活状态] - 当前时间为 [ %s ] , 所有的Agent存活状态为=> %s",
@@ -134,12 +118,13 @@ public class AgentAliveStatusMonitorService {
agentAliveStatusMap
);
// 更新 一级缓存
// 这里仅仅是更新时间
redisTemplate
.opsForHash()
.put(
ALL_AGENT_STATUS_REDIS_KEY,
"updateTime",
STATUS_UPDATE_TIME_KEY,
currentTimeString
);

View File

@@ -1,74 +1,55 @@
package io.wdd.rpc.scheduler.service.status;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.beans.AgentStatus;
import io.wdd.rpc.status.service.SyncStatusService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.OctopusStatusMessage.ConstructAgentStatusMessage;
import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
/**
* 收集OctopusAgent的运行Metric信息
* <p>
* CPU Memory AppStatus易变信息
* 定时任务 收集Agent的运行Metric的实际执行类
*/
@Service
@Slf4j
public class AgentMetricStatusCollectService {
public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch";
public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount";
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
SyncStatusService syncStatusService;
public void collect(int metricRepeatCount, int metricRepeatPinch) {
/**
* 收集所有健康主机的运行数据
*/
public void collectHealthyAgentMetric() {
// 检查基础信息
// 检查是否存在健康的主机
if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) {
log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES");
return;
}
buildMetricStatusMessageAndSend(
metricRepeatCount,
metricRepeatPinch
// 调用核心的服务
Map<String, AgentStatus> agentMetricStatusMap = syncStatusService.SyncCollectAgentMetricStatus(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
10
);
}
// todo 需要进行存储或者咋滴
log.info(
"[Agent Metric] - 所有主机的状态为 => %s",
agentMetricStatusMap
);
private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) {
LocalDateTime currentTime = TimeUtils.currentFormatTime();
List<OctopusMessage> octopusStatusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> ConstructAgentStatusMessage(
METRIC_STATUS_MESSAGE_TYPE,
agentTopicName,
currentTime
)
)
.collect(Collectors.toList());
// batch send all messages to RabbitMQ
oMessageToAgentSender.send(octopusStatusMessageList);
}

View File

@@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.rpc.status.AgentStatus;
import io.wdd.rpc.status.deprecate.AgentStatus;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -15,6 +15,7 @@ import org.springframework.data.redis.stream.StreamListener;
@Getter
@Setter
@Slf4j
@Deprecated
public class AgentStatusStreamReader implements StreamListener<String, MapRecord<String,String, String >> {
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde

View File

@@ -0,0 +1,17 @@
# 定时框架Quartz的说明
# 核心为 QuartzSchedulerService
1. addMission()方法
2. Scheduler.scheduleJob()方法就可以设置一个定时任务
# 项目创建 固定-定时任务的入口为 BuildStatusScheduleTask
1. 创建的方法为 buildAllPreScheduledTask()
# 需要将定时任务包装为一个个的Job
1. 需要继承 QuartzJobBean
2. 然后调用实际定时任务的Service
1. AgentAliveStatusMonitorJob extends QuartzJobBean
2.