[ server ] - agent runtime metric status - both agent side
This commit is contained in:
@@ -49,6 +49,12 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
||||
agentStatusCollector.sendAgentStatusToRedis();
|
||||
} else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) {
|
||||
// metric status
|
||||
|
||||
|
||||
agentStatusCollector.collect(
|
||||
statusMessage.getMetricRepeatCount(),
|
||||
statusMessage.getMetricRepeatPinch()
|
||||
);
|
||||
} else if (statusType.equals(APP_STATUS_MESSAGE_TYPE)) {
|
||||
// app status report
|
||||
} else {
|
||||
|
||||
@@ -3,6 +3,7 @@ package io.wdd.agent.status;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.agent.config.beans.init.AgentServerInfo;
|
||||
import io.wdd.agent.config.utils.AgentCommonThreadPool;
|
||||
import io.wdd.agent.executor.AppStatusExecutor;
|
||||
import io.wdd.common.beans.status.*;
|
||||
import io.wdd.common.utils.TimeUtils;
|
||||
@@ -10,14 +11,16 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.connection.stream.StringRecord;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import oshi.SystemInfo;
|
||||
import oshi.hardware.HardwareAbstractionLayer;
|
||||
import oshi.software.os.OperatingSystem;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -36,12 +39,21 @@ public class AgentStatusCollector {
|
||||
private static final long ReportInitDelay = 60000;
|
||||
private static final long ReportFixedRate = 15000;
|
||||
|
||||
private String statusRedisStreamKey;
|
||||
|
||||
|
||||
static {
|
||||
systemInfo = new SystemInfo();
|
||||
hardware = systemInfo.getHardware();
|
||||
os = systemInfo.getOperatingSystem();
|
||||
}
|
||||
|
||||
|
||||
@PostConstruct
|
||||
private void generateStatusRedisStreamKey() {
|
||||
statusRedisStreamKey = AgentStatus.getRedisStatusKey( agentServerInfo.getAgentTopicName());
|
||||
}
|
||||
|
||||
@Resource
|
||||
RedisTemplate redisTemplate;
|
||||
@Resource
|
||||
@@ -114,11 +126,9 @@ public class AgentStatusCollector {
|
||||
|
||||
try {
|
||||
|
||||
String statusStreamKey = agentServerInfo.getServerName() + "-status";
|
||||
|
||||
Map<String, String> map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect()));
|
||||
|
||||
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey);
|
||||
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusRedisStreamKey);
|
||||
|
||||
log.debug("Agent Status is ==> {}", map);
|
||||
redisTemplate.opsForStream().add(stringRecord);
|
||||
@@ -129,4 +139,32 @@ public class AgentStatusCollector {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 接收来自 OMHandlerStatus 的调用
|
||||
* 汇报服务器的状态信息
|
||||
*
|
||||
* @param metricRepeatCount 需要重复的次数
|
||||
* @param metricRepeatPinch 重复时间间隔
|
||||
*/
|
||||
public void collect(int metricRepeatCount, int metricRepeatPinch) {
|
||||
|
||||
for (int count = 0; count < metricRepeatCount; count++) {
|
||||
|
||||
try {
|
||||
|
||||
// use async thread pool to call the status collect method
|
||||
AgentCommonThreadPool.pool.submit(
|
||||
() -> this.sendAgentStatusToRedis()
|
||||
);
|
||||
|
||||
// main thread sleep for metricRepeatPinch
|
||||
TimeUnit.SECONDS.sleep(metricRepeatPinch);
|
||||
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,12 @@ import java.util.List;
|
||||
@SuperBuilder(toBuilder = true)
|
||||
public class AgentStatus {
|
||||
|
||||
private static final String AGENT_STATUS_KEY_SUFFIX = "-Status";
|
||||
|
||||
public static String getRedisStatusKey(String agentTopicName) {
|
||||
return agentTopicName+AGENT_STATUS_KEY_SUFFIX;
|
||||
}
|
||||
|
||||
String time;
|
||||
|
||||
String agentName;
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.wdd.common.beans.status;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 没时间整这些,反正大一点数据也无所谓 不是吗
|
||||
*/
|
||||
@Deprecated
|
||||
@Data
|
||||
public class MetricStatus {
|
||||
|
||||
CpuInfo cpuInfo;
|
||||
|
||||
MemoryInfo memoryInfo;
|
||||
|
||||
AppStatusInfo appStatus;
|
||||
|
||||
}
|
||||
@@ -8,6 +8,9 @@ import org.springframework.scheduling.quartz.QuartzJobBean;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT;
|
||||
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH;
|
||||
|
||||
public class AgentRunMetricStatusJob extends QuartzJobBean {
|
||||
|
||||
@Resource
|
||||
@@ -19,12 +22,9 @@ public class AgentRunMetricStatusJob extends QuartzJobBean {
|
||||
|
||||
// 从JobDetailContext中获取相应的信息
|
||||
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
||||
System.out.println("jobDataMap = " + jobDataMap);
|
||||
|
||||
int metricRepeatCount = 10;
|
||||
int metricRepeatPinch = 5;
|
||||
// 执行Agent Metric 状态收集任务
|
||||
agentRuntimeMetricStatus.collect(metricRepeatCount, metricRepeatPinch);
|
||||
agentRuntimeMetricStatus.collect((Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH));
|
||||
|
||||
// todo 机构设计状态会被存储至 Redis Stream Key 中
|
||||
// AgentTopicName-Metric
|
||||
|
||||
@@ -12,6 +12,10 @@ import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT;
|
||||
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@@ -67,15 +71,19 @@ public class BuildStatusScheduleTask {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// build the Job
|
||||
HashMap<String, Integer> map = new HashMap<String, Integer>();
|
||||
map.put(METRIC_REPORT_TIME_PINCH,metricReportTimePinch);
|
||||
map.put(METRIC_REPORT_TIMES_COUNT,metricReportTimesCount);
|
||||
|
||||
// build the Job 只发送一次消息,然后让Agent获取消息 (重复间隔,重复次数) 进行相应的处理!
|
||||
// todo 解决创建太多对象的问题,需要缓存相应的内容
|
||||
octopusQuartzService.addJob(
|
||||
AgentRunMetricStatusJob.class,
|
||||
"agentRunMetricStatusJob",
|
||||
JOB_GROUP_NAME,
|
||||
metricReportTimePinch,
|
||||
metricReportTimesCount,
|
||||
null
|
||||
1,
|
||||
map
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@@ -23,6 +23,9 @@ public class AgentRuntimeMetricStatus {
|
||||
|
||||
public static List<String> ALL_HEALTHY_AGENT_TOPIC_NAMES;
|
||||
|
||||
public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch";
|
||||
public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount";
|
||||
|
||||
@Resource
|
||||
CollectAgentStatus collectAgentStatus;
|
||||
|
||||
@@ -47,7 +50,7 @@ public class AgentRuntimeMetricStatus {
|
||||
return OctopusStatusMessage.builder()
|
||||
.type(METRIC_STATUS_MESSAGE_TYPE)
|
||||
.metricRepeatCount(metricRepeatCount)
|
||||
.metricRepeatCount(metricRepeatCount)
|
||||
.metricRepeatPinch(metricRepeatPinch)
|
||||
.agentTopicName(agentTopicName)
|
||||
.build();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user