[ server ] - agent runtime metric status - 1

This commit is contained in:
zeaslity
2023-01-11 16:02:08 +08:00
parent 1498b3cc0e
commit 060d8dced8
9 changed files with 265 additions and 30 deletions

View File

@@ -28,5 +28,8 @@ public class OctopusStatusMessage {
String agentTopicName;
int metricRepeatCount;
int metricRepeatPinch;
}

View File

@@ -0,0 +1,34 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.status.AgentRuntimeMetricStatus;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
public class AgentRunMetricStatusJob extends QuartzJobBean {
@Resource
AgentRuntimeMetricStatus agentRuntimeMetricStatus;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// 从JobDetailContext中获取相应的信息
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
System.out.println("jobDataMap = " + jobDataMap);
int metricRepeatCount = 10;
int metricRepeatPinch = 5;
// 执行Agent Metric 状态收集任务
agentRuntimeMetricStatus.collect(metricRepeatCount, metricRepeatPinch);
// todo 机构设计状态会被存储至 Redis Stream Key 中
// AgentTopicName-Metric
}
}

View File

@@ -2,13 +2,11 @@ package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.config.QuartzLogOperator;
import io.wdd.rpc.status.MonitorAllAgentStatus;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import java.nio.file.AccessMode;
public class MonitorAllAgentStatusJob extends QuartzJobBean {
@@ -22,7 +20,7 @@ public class MonitorAllAgentStatusJob extends QuartzJobBean {
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// get the jobMetaMap
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service
monitorAllAgentStatus.go();

View File

@@ -1,14 +1,17 @@
package io.wdd.rpc.scheduler.service;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob;
import io.wdd.rpc.scheduler.job.MonitorAllAgentStatusJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Date;
@Component
@Slf4j
@@ -23,28 +26,80 @@ public class BuildStatusScheduleTask {
@Value(value = "${octopus.status.healthy.start-delay}")
int healthyCheckStartDelaySeconds;
@Value(value = "${octopus.status.metric.pinch}")
int metricReportTimePinch;
public static final String JOB_GROUP_NAME = "OctopusAgent";
@PostConstruct
public void buildAll(){
private void buildAll() {
// Agent存活健康状态检查
buildMonitorAllAgentStatusScheduleTask();
// Agent运行信息检查 Metric
// Agent全部信息检查 All
}
public void buildMonitorAllAgentStatusScheduleTask(){
/**
* Agent运行信息检查 Metric
* 【调用】应该由 健康状态检查结果 调用 ==> 所有存活节点需要进行Metric信息汇报
* 【间隔】存活间隔内间隔一定的时间汇报Metric
*/
public void buildAgentMetricScheduleTask() {
// 计算 Metric检测的时间间隔
int metricReportTimesCount = 19;
try {
CronExpression cronExpression = new CronExpression(healthyCronTimeExpress);
Date now = new Date();
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000;
metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1;
System.out.println("totalSeconds = " + totalSeconds);
System.out.println("metricReportTimesCount = " + metricReportTimesCount);
} catch (ParseException e) {
throw new RuntimeException(e);
}
// build the Job
// todo 解决创建太多对象的问题,需要缓存相应的内容
octopusQuartzService.addJob(
AgentRunMetricStatusJob.class,
"agentRunMetricStatusJob",
JOB_GROUP_NAME,
metricReportTimePinch,
metricReportTimesCount,
null
);
}
/**
* Agent存活健康状态检查
* <p>
* 定时任务从Nacos配置中获取相应的信息
* 延迟触发时间 healthyCheckStartDelaySeconds
* 定时任务间隔 healthyCronTimeExpress
*/
private void buildMonitorAllAgentStatusScheduleTask() {
// build the Job
octopusQuartzService.addJob(
MonitorAllAgentStatusJob.class,
"monitorAllAgentStatusJob",
"monitorAllAgentStatusJob",
JOB_GROUP_NAME,
healthyCheckStartDelaySeconds,
healthyCronTimeExpress,
null
);
}
}

View File

@@ -1,6 +1,7 @@
package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
import org.quartz.Trigger;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.List;
import java.util.Map;
@@ -15,11 +16,11 @@ public interface OctopusQuartzService {
* @param jobClass 任务job实现类
* @param jobName 任务job名称保证唯一性
* @param jobGroupName 任务job组名
* @param jobTime 任务时间间隔(秒)
* @param jobTimes 任务运行次数(若<0则不限次数
* @param jobRunTimePinch 任务时间间隔(秒)
* @param jobRunRepeatTimes 任务运行次数(若<0则不限次数
* @param jobData 任务参数
*/
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData);
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData);
/**
* 增加一个任务job
@@ -84,4 +85,7 @@ public interface OctopusQuartzService {
List<Map<String, Object>> queryRunJob();
List<Trigger> queryAllTrigger();
}

View File

@@ -13,7 +13,9 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.stream.Collectors;
import static io.wdd.rpc.scheduler.service.BuildStatusScheduleTask.JOB_GROUP_NAME;
import static org.quartz.TriggerBuilder.newTrigger;
/**
@@ -49,15 +51,18 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
* @param jobClass 任务实现类
* @param jobName 任务名称
* @param jobGroupName 任务组名
* @param jobTime 时间表达式 (这是每隔多少秒为一次任务)
* @param jobTimes 运行的次数 <0:表示不限次数)
* @param jobRunTimePinch 时间表达式 (这是每隔多少秒为一次任务)
* @param jobRunRepeatTimes 运行的次数 <0:表示不限次数)
* @param jobData 参数
*/
@Override
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData) {
public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobRunTimePinch, int jobRunRepeatTimes, Map jobData) {
try {
// 任务名称和组构成任务key
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
JobDetail jobDetail = JobBuilder
.newJob(jobClass)
.withIdentity(jobName, jobGroupName)
.build();
// 设置job参数
if (jobData != null && jobData.size() > 0) {
@@ -65,14 +70,22 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
}
// 使用simpleTrigger规则
Trigger trigger = null;
if (jobTimes < 0) {
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)).startNow().build();
} else {
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)).startNow().build();
}
log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
Trigger trigger = newTrigger()
.withIdentity(jobName, jobGroupName)
.withSchedule(
SimpleScheduleBuilder
.repeatSecondlyForTotalCount(
jobRunRepeatTimes,
jobRunTimePinch
)
)
.startNow()
.build();
log.debug("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new MyRuntimeException("add job error!");
@@ -110,7 +123,16 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
startTime = 1;
}
Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName).startAt(DateBuilder.futureDate(startTime, IntervalUnit.SECOND)).withSchedule(CronScheduleBuilder.cronSchedule(cronJobExpression)).startNow().build();
Trigger trigger = newTrigger()
.withIdentity(jobName, jobGroupName)
.startAt(
DateBuilder.futureDate(startTime, IntervalUnit.SECOND)
)
.withSchedule(
CronScheduleBuilder.cronSchedule(cronJobExpression)
)
.startNow()
.build();
// 把作业和触发器注册到任务调度中
scheduler.scheduleJob(jobDetail, trigger);
@@ -283,4 +305,26 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
return jobList;
}
@Override
public List<Trigger> queryAllTrigger() {
try {
return scheduler.getTriggerKeys(
GroupMatcher.groupEquals(JOB_GROUP_NAME)
).stream().map(
triggerKey -> {
try {
return scheduler.getTrigger(triggerKey);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
).collect(Collectors.toList());
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,61 @@
package io.wdd.rpc.status;
import io.wdd.common.beans.status.OctopusStatusMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
/**
* 收集OctopusAgent的运行Metric信息
* <p>
* CPU Memory AppStatus易变信息
*/
@Service
@Slf4j
public class AgentRuntimeMetricStatus {
public static List<String> ALL_HEALTHY_AGENT_TOPIC_NAMES;
@Resource
CollectAgentStatus collectAgentStatus;
public void collect(int metricRepeatCount, int metricRepeatPinch) {
// 检查基础信息
if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAMES)) {
log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES");
}
// 构建 OctopusMessage
// 只发送一次消息让Agent循环定时执行任务
buildMetricStatusMessageAndSend(metricRepeatCount, metricRepeatPinch);
//
}
private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) {
List<OctopusStatusMessage> collect = ALL_HEALTHY_AGENT_TOPIC_NAMES.stream()
.map(
agentTopicName -> {
return OctopusStatusMessage.builder()
.type(METRIC_STATUS_MESSAGE_TYPE)
.metricRepeatCount(metricRepeatCount)
.metricRepeatCount(metricRepeatCount)
.agentTopicName(agentTopicName)
.build();
}
).collect(Collectors.toList());
// send to the next level
collectAgentStatus.statusMessageToAgent(collect);
}
}

View File

@@ -1,5 +1,7 @@
package io.wdd.rpc.status;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.OctopusStatusMessage;
@@ -22,14 +24,17 @@ public class CollectAgentStatus {
@Resource
ToAgentMessageSender toAgentMessageSender;
@Resource
ObjectMapper objectMapper;
public void collectAgentStatus(OctopusStatusMessage statusMessage) {
this.collectAgentStatusList(List.of(statusMessage));
this.statusMessageToAgent(List.of(statusMessage));
}
public void collectAgentStatusList(List<OctopusStatusMessage> statusMessageList) {
public void statusMessageToAgent(List<OctopusStatusMessage> statusMessageList) {
// build all the OctopusMessage
List<OctopusMessage> octopusMessageList = statusMessageList.stream().map(
@@ -43,15 +48,23 @@ public class CollectAgentStatus {
toAgentMessageSender.send(octopusMessageList);
// todo how to get result ?
}
private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) {
// must be like this or it will be deserialized as LinkedHashMap
String s;
try {
s = objectMapper.writeValueAsString(octopusStatusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage.builder()
.uuid(octopusStatusMessage.getAgentTopicName())
.type(OctopusMessageType.STATUS)
.init_time(TimeUtils.currentTime())
.content(octopusStatusMessage)
.content(s)
.build();
}

View File

@@ -1,22 +1,29 @@
package io.wdd.rpc.status;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.scheduler.service.OctopusQuartzService;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Trigger;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES;
/**
* 获取所有注册的Agent
@@ -44,6 +51,8 @@ public class MonitorAllAgentStatus {
CollectAgentStatus collectAgentStatus;
@Resource
CoreServerService coreServerService;
@Resource
BuildStatusScheduleTask buildStatusScheduleTask;
public void go() {
@@ -75,7 +84,8 @@ public class MonitorAllAgentStatus {
private void checkOrCreateRedisHealthyKey() {
if (!redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) {
// must init the cached map && make sure the redis key existed!
if (null == AGENT_HEALTHY_INIT_MAP || !redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) {
log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create");
// build the redis all agent healthy map struct
@@ -95,13 +105,15 @@ public class MonitorAllAgentStatus {
}
private void buildAndSendAgentHealthMessage() {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST.stream().map(
agentTopicName -> OctopusStatusMessage.builder()
.agentTopicName(agentTopicName)
.type(HEALTHY_STATUS_MESSAGE_TYPE)
.build()
).collect(Collectors.toList());
collectAgentStatus.collectAgentStatusList(collect);
collectAgentStatus.statusMessageToAgent(collect);
}
private void updateAllAgentHealthyStatus() {
@@ -124,6 +136,17 @@ public class MonitorAllAgentStatus {
// help gc
tmp = null;
// Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) {
if (statusList.get(i).equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i));
}
}
ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames;
// 执行Metric上报任务
buildStatusScheduleTask.buildAgentMetricScheduleTask();
// update time
AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString);
// init the healthy map