diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java index 8807dc2..5f5dbd8 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java @@ -49,6 +49,12 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { agentStatusCollector.sendAgentStatusToRedis(); } else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) { // metric status + + + agentStatusCollector.collect( + statusMessage.getMetricRepeatCount(), + statusMessage.getMetricRepeatPinch() + ); } else if (statusType.equals(APP_STATUS_MESSAGE_TYPE)) { // app status report } else { diff --git a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java index 6873d5a..1a4a440 100644 --- a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java +++ b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java @@ -3,6 +3,7 @@ package io.wdd.agent.status; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.agent.config.beans.init.AgentServerInfo; +import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.executor.AppStatusExecutor; import io.wdd.common.beans.status.*; import io.wdd.common.utils.TimeUtils; @@ -10,14 +11,16 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.StringRecord; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import oshi.SystemInfo; import oshi.hardware.HardwareAbstractionLayer; import oshi.software.os.OperatingSystem; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; @Service @Slf4j @@ -36,12 +39,21 @@ public class AgentStatusCollector { private static final long ReportInitDelay = 60000; private static final long ReportFixedRate = 15000; + private String statusRedisStreamKey; + + static { systemInfo = new SystemInfo(); hardware = systemInfo.getHardware(); os = systemInfo.getOperatingSystem(); } + + @PostConstruct + private void generateStatusRedisStreamKey() { + statusRedisStreamKey = AgentStatus.getRedisStatusKey( agentServerInfo.getAgentTopicName()); + } + @Resource RedisTemplate redisTemplate; @Resource @@ -114,11 +126,9 @@ public class AgentStatusCollector { try { - String statusStreamKey = agentServerInfo.getServerName() + "-status"; - Map map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect())); - StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey); + StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusRedisStreamKey); log.debug("Agent Status is ==> {}", map); redisTemplate.opsForStream().add(stringRecord); @@ -129,4 +139,32 @@ public class AgentStatusCollector { } + /** + * 接收来自 OMHandlerStatus 的调用 + * 汇报服务器的状态信息 + * + * @param metricRepeatCount 需要重复的次数 + * @param metricRepeatPinch 重复时间间隔 + */ + public void collect(int metricRepeatCount, int metricRepeatPinch) { + + for (int count = 0; count < metricRepeatCount; count++) { + + try { + + // use async thread pool to call the status collect method + AgentCommonThreadPool.pool.submit( + () -> this.sendAgentStatusToRedis() + ); + + // main thread sleep for metricRepeatPinch + TimeUnit.SECONDS.sleep(metricRepeatPinch); + + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } } diff --git a/common/src/main/java/io/wdd/common/beans/status/AgentStatus.java b/common/src/main/java/io/wdd/common/beans/status/AgentStatus.java index 006bcb4..489a48d 100644 --- a/common/src/main/java/io/wdd/common/beans/status/AgentStatus.java +++ b/common/src/main/java/io/wdd/common/beans/status/AgentStatus.java @@ -14,6 +14,12 @@ import java.util.List; @SuperBuilder(toBuilder = true) public class AgentStatus { + private static final String AGENT_STATUS_KEY_SUFFIX = "-Status"; + + public static String getRedisStatusKey(String agentTopicName) { + return agentTopicName+AGENT_STATUS_KEY_SUFFIX; + } + String time; String agentName; diff --git a/common/src/main/java/io/wdd/common/beans/status/MetricStatus.java b/common/src/main/java/io/wdd/common/beans/status/MetricStatus.java new file mode 100644 index 0000000..7b980fd --- /dev/null +++ b/common/src/main/java/io/wdd/common/beans/status/MetricStatus.java @@ -0,0 +1,18 @@ +package io.wdd.common.beans.status; + +import lombok.Data; + +/** + * 没时间整这些,反正大一点数据也无所谓 不是吗 + */ +@Deprecated +@Data +public class MetricStatus { + + CpuInfo cpuInfo; + + MemoryInfo memoryInfo; + + AppStatusInfo appStatus; + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java index 3ab9659..4a4ecbc 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java @@ -8,6 +8,9 @@ import org.springframework.scheduling.quartz.QuartzJobBean; import javax.annotation.Resource; +import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT; +import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH; + public class AgentRunMetricStatusJob extends QuartzJobBean { @Resource @@ -19,12 +22,9 @@ public class AgentRunMetricStatusJob extends QuartzJobBean { // 从JobDetailContext中获取相应的信息 JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - System.out.println("jobDataMap = " + jobDataMap); - int metricRepeatCount = 10; - int metricRepeatPinch = 5; // 执行Agent Metric 状态收集任务 - agentRuntimeMetricStatus.collect(metricRepeatCount, metricRepeatPinch); + agentRuntimeMetricStatus.collect((Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH)); // todo 机构设计状态会被存储至 Redis Stream Key 中 // AgentTopicName-Metric diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index 2ebb36b..8568830 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -12,6 +12,10 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.ParseException; import java.util.Date; +import java.util.HashMap; + +import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT; +import static io.wdd.rpc.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH; @Component @Slf4j @@ -67,15 +71,19 @@ public class BuildStatusScheduleTask { throw new RuntimeException(e); } - // build the Job + HashMap map = new HashMap(); + map.put(METRIC_REPORT_TIME_PINCH,metricReportTimePinch); + map.put(METRIC_REPORT_TIMES_COUNT,metricReportTimesCount); + + // build the Job 只发送一次消息,然后让Agent获取消息 (重复间隔,重复次数) 进行相应的处理! // todo 解决创建太多对象的问题,需要缓存相应的内容 octopusQuartzService.addJob( AgentRunMetricStatusJob.class, "agentRunMetricStatusJob", JOB_GROUP_NAME, metricReportTimePinch, - metricReportTimesCount, - null + 1, + map ); } diff --git a/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java index e1b09b6..c064825 100644 --- a/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/AgentRuntimeMetricStatus.java @@ -23,6 +23,9 @@ public class AgentRuntimeMetricStatus { public static List ALL_HEALTHY_AGENT_TOPIC_NAMES; + public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch"; + public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount"; + @Resource CollectAgentStatus collectAgentStatus; @@ -47,7 +50,7 @@ public class AgentRuntimeMetricStatus { return OctopusStatusMessage.builder() .type(METRIC_STATUS_MESSAGE_TYPE) .metricRepeatCount(metricRepeatCount) - .metricRepeatCount(metricRepeatCount) + .metricRepeatPinch(metricRepeatPinch) .agentTopicName(agentTopicName) .build(); }