diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 271231a..e7b8485 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; -import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; +import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; +import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; @@ -45,7 +45,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { RedisTemplate redisTemplate; @Resource - AsyncWaitOMResultService asyncWaitOMResultService; + AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; @Override public Map getAllAgentVersion() { @@ -70,17 +70,17 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( + OctopusMessageAsyncReplayContend agentReplayContend = OctopusMessageAsyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOMResultService.waitFor(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.waitFor(agentReplayContend); //此处存在重大bug,会导致CPU占用飙升 /*CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( @@ -106,10 +106,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { } // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend); // 处理结果 - OMAsyncReplayContend + agentReplayContend .getReplayOMList() .stream() .forEach( @@ -122,7 +122,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - OMAsyncReplayContend = null; + agentReplayContend = null; } return result; @@ -156,16 +156,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造结果 - OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( + OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOMResultService.waitFor(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend); /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, @@ -185,10 +185,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend); // 处理结果 - OMAsyncReplayContend + OctopusMessageAsyncReplayContend .getReplayOMList() .stream() .forEach( @@ -216,7 +216,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - OMAsyncReplayContend = null; + OctopusMessageAsyncReplayContend = null; } return result; diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index cffe3df..410e8bd 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -76,7 +76,7 @@ public class StatusController { public R>> ManualUpdateAgentStatus() { // 手动调用更新 - agentStatusCacheService.updateAgentStatusMapCache(); + agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap); return R.ok(STATUS_AGENT_LIST_MAP); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 8a567c3..17c8018 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -2,8 +2,8 @@ package io.wdd.rpc.execute.service; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; -import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; +import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; +import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -24,7 +24,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; @Resource - AsyncWaitOMResultService asyncWaitOMResultService; + AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; @Resource SyncExecutionService asyncExecutionService; @@ -205,15 +205,15 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } // 构造回复信息的内容 - OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( + OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = OctopusMessageAsyncReplayContend.build( commandCount, CurrentAppOctopusMessageType, initTime ); - CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch(); // 开始等待结果 - asyncWaitOMResultService.waitFor(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend); // 监听结果 try { @@ -228,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { // 等待所有的结果返回 // 停止等待结果 - asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend); // 解析结果 - OMAsyncReplayContend + OctopusMessageAsyncReplayContend .getReplayOMList() .stream() .map( diff --git a/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java b/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java index 7cf3039..c4e5b09 100644 --- a/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java +++ b/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java @@ -7,7 +7,6 @@ import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -15,7 +14,6 @@ import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; -import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; /** @@ -50,9 +48,9 @@ public class AgentStatusCacheService { /** * 存储所有Agent状态的Map *

- * 内容为 agentTopicName-健康状态 + * 内容为 agentTopicName- True代表健康 False代表不健康 */ - public static final Map ALL_AGENT_STATUS_MAP = new HashMap<>(); + public static final Map ALL_AGENT_STATUS_MAP = new HashMap<>(); /** * 保存所有健康运行的Agent Topic Name @@ -67,8 +65,6 @@ public class AgentStatusCacheService { @Resource CoreServerService coreServerService; - @Resource - RedisTemplate redisTemplate; @PostConstruct public void GenerateAllCache() { @@ -77,7 +73,7 @@ public class AgentStatusCacheService { updateAllAgentTopicNameCache(); // Agent状态信息的两个Map - updateAgentStatusMapCache(); + // updateAgentStatusMapCache(agentAliveStatusMap); } @@ -118,7 +114,7 @@ public class AgentStatusCacheService { * 由定时任务或者初始化服务触发 * 2023-02-21 前端接口,手动更新 */ - public void updateAgentStatusMapCache() { + public void updateAgentStatusMapCache(Map agentAliveStatusMap) { // 检查,排除没有节点的情况 if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { @@ -126,47 +122,17 @@ public class AgentStatusCacheService { return; } - // 从redis中获取所有节点的当前状态 - List statusList = redisTemplate - .opsForHash() - .multiGet( - ALL_AGENT_STATUS_REDIS_KEY, - ALL_AGENT_TOPIC_NAME_LIST - ); - // 初始话 还没有状态的情况,直接return - if (CollectionUtils.isEmpty(statusList)) { - log.warn("agent status from redis is empty !"); - return; - } - - // 增加更新时间 2023年2月21日 - String timeString = TimeUtils.currentTimeString(); - - // 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown) - HashMap agentStatusMap = new HashMap<>(32); - for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) { - agentStatusMap.put( - ALL_AGENT_TOPIC_NAME_LIST.get(i), - uniformHealthyStatus(String.valueOf(statusList.get(i))) - ); - } - - - // 2023-01-16 + // 2023年6月15日 更新状态缓存 ALL_AGENT_STATUS_MAP.clear(); - ALL_AGENT_STATUS_MAP.putAll(agentStatusMap); - ALL_AGENT_STATUS_MAP.put( - STATUS_UPDATE_TIME_KEY, - timeString - ); + ALL_AGENT_STATUS_MAP.putAll(agentAliveStatusMap); // 2023-01-16 // 更新 状态-Agent容器 内容为 // HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] // FAILED -> ["agentTopicName-1", "agentTopicName-2"] - Map> statusAgentListMap = agentStatusMap + Map> statusAgentListMap = agentAliveStatusMap .entrySet() .stream() .collect( @@ -178,7 +144,7 @@ public class AgentStatusCacheService { .stream() .collect( Collectors.toMap( - entry -> entry.getKey(), + entry -> entry.getKey() ? "HEALTHY" : "FAILED", entry -> entry .getValue() .stream() @@ -192,7 +158,9 @@ public class AgentStatusCacheService { // 2023-2-3 bug fix STATUS_AGENT_LIST_MAP.clear(); STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); + // 2023年2月21日,更新时间 + String timeString = TimeUtils.currentFormatTimeString(); STATUS_AGENT_LIST_MAP.put( STATUS_UPDATE_TIME_KEY, Collections.singletonList(timeString) @@ -200,22 +168,26 @@ public class AgentStatusCacheService { log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了"); - - // Trigger调用Agent Metric 任务 - ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); - for (int i = 0; i < statusList.size(); i++) { - if (null !=statusList.get(i) && statusList - .get(i) - .equals("1")) { - allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i)); - } - } // 缓存相应的存活Agent + List allHealthyAgentTopicNames = agentAliveStatusMap + .entrySet() + .stream() + .filter( + entry -> entry + .getKey() + .equals(Boolean.TRUE) + ) + .map( + Map.Entry::getKey + ) + .collect(Collectors.toList()); + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear(); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames); + // help gc - agentStatusMap = null; + agentAliveStatusMap = null; statusAgentListMap = null; allHealthyAgentTopicNames = null; } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java similarity index 74% rename from server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java index 171b847..5fdd822 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java @@ -20,31 +20,37 @@ import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESS */ @Service @Slf4j -public class AsyncWaitOMResultService { +public class AsyncWaitOctopusMessageResultService { /** * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 * KEY -> replayMatchKey - * VALUE -> OMAsyncReplayContend - 包含countDownLatch 和 result + * VALUE -> OctopusMessageAsyncReplayContend - 包含countDownLatch 和 result */ - private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); + private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); - public void waitFor(OMAsyncReplayContend OMAsyncReplayContend) { + public void waitFor(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key OM_REPLAY_WAITING_TARGET_MAP.put( - OMAsyncReplayContend.getReplayMatchKey(), - OMAsyncReplayContend + OctopusMessageAsyncReplayContend.getReplayMatchKey(), + OctopusMessageAsyncReplayContend ); // 在调用线程的countDownLunch结束之后,关闭 // 清除 REPLAY_CACHE_MAP 中的队列 } - public void stopWaiting(OMAsyncReplayContend OMAsyncReplayContend) { + public void stopWaiting(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) { // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - OM_REPLAY_WAITING_TARGET_MAP.remove(OMAsyncReplayContend.getReplayMatchKey()); + OctopusMessageAsyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageAsyncReplayContend.getReplayMatchKey()); + + // 移除该内容 + OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageAsyncReplayContend.getReplayMatchKey()); + + // help gc + contend = null; } @@ -82,7 +88,7 @@ public class AsyncWaitOMResultService { OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 构造 replayMatchKey - String matchKey = OMAsyncReplayContend.generateMatchKey( + String matchKey = OctopusMessageAsyncReplayContend.generateMatchKey( replayOMessage.getType(), replayOMessage.getInit_time() ); @@ -99,11 +105,12 @@ public class AsyncWaitOMResultService { } // Map中包含有Key,那么放置进去 - OMAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); + OctopusMessageAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); + // 需要操作countDown replayContend .getCountDownLatch() diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java similarity index 73% rename from server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java index abf42e0..5864190 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java @@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch; @NoArgsConstructor @SuperBuilder(toBuilder = true) @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") -public class OMAsyncReplayContend { +public class OctopusMessageAsyncReplayContend { @ApiModelProperty("rpc消息的类型") OctopusMessageType type; @@ -37,17 +37,6 @@ public class OMAsyncReplayContend { @ApiModelProperty("回复的结果列表, 临时保存") ArrayList replayOMList; - protected static String generateMatchKey(OMAsyncReplayContend replayIdentifier) { - - String relayMatchKey = replayIdentifier - .getType() - .toString() + replayIdentifier - .getInitTime() - .toString(); - - return relayMatchKey; - } - /** * @param messageType * @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); @@ -61,21 +50,26 @@ public class OMAsyncReplayContend { } /** - * 方便使用的一个构造方法 + * Execution模块使用的模板 * * @return */ - public static OMAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + public static OctopusMessageAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { - return new OMAsyncReplayContend( + CountDownLatch latch = null; + if (waitForReplayNum != 0) { + latch = new CountDownLatch(waitForReplayNum); + } + + return new OctopusMessageAsyncReplayContend( currentOMType, currentTime, generateMatchKey( currentOMType, currentTime ), - new CountDownLatch(waitForReplayNum), - new ArrayList<>() + latch, + new ArrayList(16) ); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java similarity index 73% rename from server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java rename to server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java index c20a183..a72a37b 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java @@ -1,17 +1,17 @@ package io.wdd.rpc.scheduler.job; import io.wdd.rpc.scheduler.config.QuartzLogOperator; -import io.wdd.rpc.scheduler.service.status.CheckAgentAliveStatus; +import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; import javax.annotation.Resource; -public class AgentStatusMonitorJob extends QuartzJobBean { +public class AgentAliveStatusMonitorJob extends QuartzJobBean { @Resource - CheckAgentAliveStatus checkAgentAliveStatus; + AgentAliveStatusMonitorService agentAliveStatusMonitorService; @Resource QuartzLogOperator quartzLogOperator; @@ -23,7 +23,7 @@ public class AgentStatusMonitorJob extends QuartzJobBean { //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // actually execute the monitor service - checkAgentAliveStatus.go(); + agentAliveStatusMonitorService.go(); // log to somewhere quartzLogOperator.save(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index f8cc610..755e25d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -1,8 +1,8 @@ package io.wdd.rpc.scheduler.service; +import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob; import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; -import io.wdd.rpc.scheduler.job.AgentStatusMonitorJob; import lombok.extern.slf4j.Slf4j; import org.quartz.CronExpression; import org.springframework.beans.factory.annotation.Value; @@ -103,7 +103,7 @@ public class BuildStatusScheduleTask { // build the Job octopusQuartzService.addMission( - AgentStatusMonitorJob.class, + AgentAliveStatusMonitorJob.class, "monitorAllAgentStatusJob", JOB_GROUP_NAME, healthyCheckStartDelaySeconds, diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java similarity index 69% rename from server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java rename to server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java index ebe797e..d63721f 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java @@ -4,6 +4,7 @@ import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.init.AgentStatusCacheService; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.status.OctopusStatusMessage; +import io.wdd.rpc.status.service.AsyncStatusService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.context.annotation.Lazy; @@ -13,8 +14,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.Map; import java.util.stream.Collectors; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; @@ -38,7 +38,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE @Service @Slf4j @Lazy -public class CheckAgentAliveStatus { +public class AgentAliveStatusMonitorService { private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource @@ -52,38 +52,35 @@ public class CheckAgentAliveStatus { @Resource BuildStatusScheduleTask buildStatusScheduleTask; + @Resource + AsyncStatusService asyncStatusService; + + private HashMap AGENT_HEALTHY_INIT_MAP; public void go() { - try { - // 1. 获取所有注册的Agent 手动更新 - agentStatusCacheService.updateAllAgentTopicNameCache(); - if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { - log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !"); - return; - } - - // 1.1 检查 Agent状态保存数据结构是否正常 - checkOrCreateRedisHealthyKey(); - - // 2.发送状态检查信息, agent需要update相应的HashMap的值 - // 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改 - CountDownLatch aliveStatusCDL = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size()); - - - buildAndSendAgentHealthMessage(); - - // 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报 - TimeUnit.SECONDS.sleep(MAX_WAIT_AGENT_REPORT_STATUS_TIME); - - // 4.检查相应的 状态HashMap,然后全部置为零 - // todo 存储到某个地方,目前只是打印日志 - updateAllAgentHealthyStatus(); - - } catch (InterruptedException e) { - throw new RuntimeException(e); + // 1. 获取所有注册的Agent 手动更新 + agentStatusCacheService.updateAllAgentTopicNameCache(); + if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { + log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !"); + return; } + + // 1.1 检查 Agent状态保存数据结构是否正常 + checkOrCreateRedisHealthyKey(); + + // 2.发送状态检查信息, agent需要update相应的HashMap的值 + // 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改 + + // 使用同步更新的策略 + Map agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus( + ALL_AGENT_TOPIC_NAME_LIST, + 5 + ); + + // 更新Agent的状态 + updateAllAgentHealthyStatus(agentAliveStatusMap); } private void checkOrCreateRedisHealthyKey() { @@ -129,8 +126,7 @@ public class CheckAgentAliveStatus { .map( agentTopicName -> OctopusStatusMessage .builder() - .agentTopicName(agentTopicName) - .type(HEALTHY_STATUS_MESSAGE_TYPE) + .statusType(HEALTHY_STATUS_MESSAGE_TYPE) .build() ) .collect(Collectors.toList()); @@ -139,15 +135,15 @@ public class CheckAgentAliveStatus { collectAgentStatus.statusMessageToAgent(collect); } - private void updateAllAgentHealthyStatus() { + private void updateAllAgentHealthyStatus(Map agentAliveStatusMap) { String currentTimeString = TimeUtils.currentTimeString(); // 更新所有的缓存状态 - agentStatusCacheService.updateAgentStatusMapCache(); + agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap); // 执行Metric上报定时任务 - buildStatusScheduleTask.buildAgentMetricScheduleTask(); +// buildStatusScheduleTask.buildAgentMetricScheduleTask(); // 这里仅仅是更新时间 redisTemplate @@ -158,6 +154,14 @@ public class CheckAgentAliveStatus { currentTimeString ); + // 更新所有的Agent状态 + redisTemplate + .opsForHash() + .putAll( + ALL_AGENT_STATUS_REDIS_KEY, + agentAliveStatusMap + ); + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java index 40e13e0..26b5a75 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java @@ -52,7 +52,7 @@ public class AgentRuntimeMetricStatus { agentTopicName -> { return OctopusStatusMessage .builder() - .type(METRIC_STATUS_MESSAGE_TYPE) + .statusType(METRIC_STATUS_MESSAGE_TYPE) .metricRepeatCount(metricRepeatCount) .metricRepeatPinch(metricRepeatPinch) .agentTopicName(agentTopicName) diff --git a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java index 37f62a7..49c5098 100644 --- a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java +++ b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java @@ -14,20 +14,18 @@ public class OctopusStatusMessage { // below two will be used by both server and agent // 存储所有Agent的实时健康状态, 1代表健康 0代表失败 public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_HEALTHY_STATUS"; - public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; - public static final String ALL_STATUS_MESSAGE_TYPE = "all"; - public static final String METRIC_STATUS_MESSAGE_TYPE = "metric"; - public static final String APP_STATUS_MESSAGE_TYPE = "app"; + public static final String HEALTHY_STATUS_MESSAGE_TYPE = "PING"; + public static final String ALL_STATUS_MESSAGE_TYPE = "ALL"; + public static final String METRIC_STATUS_MESSAGE_TYPE = "METRIC"; + public static final String APP_STATUS_MESSAGE_TYPE = "APP"; /** - * which kind of status should be return + * which kind of status should be return * metric => short time message * all => all agent status message * healthy => check for healthy - * */ - String type; - - String agentTopicName; + */ + String statusType; int metricRepeatCount; diff --git a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java new file mode 100644 index 0000000..844035b --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java @@ -0,0 +1,16 @@ +package io.wdd.rpc.status.service; + +import java.util.List; +import java.util.Map; + +public interface AsyncStatusService { + + /** + * 应该是同步收集 agentTopicNameList 的节点的存活状态,并返回所有的状态存活结果 + * + * @param agentTopicNameList + * @param aliveStatusWaitMaxTime + * @return + */ + Map AsyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime); +} diff --git a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java new file mode 100644 index 0000000..7286c6d --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java @@ -0,0 +1,146 @@ +package io.wdd.rpc.status.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.OctopusMessageType; +import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; +import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; +import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.rpc.status.OctopusStatusMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; + +@Slf4j +@Service +public class AsyncStatusServiceImpl implements AsyncStatusService { + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; + + @Resource + OMessageToAgentSender oMessageToAgentSender; + + @Resource + ObjectMapper objectMapper; + + @Resource + AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; + + @Override + public Map AsyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime) { + + // 构造最后的结果Map + Map agentAliveStatusMap = agentTopicNameList + .stream() + .collect( + Collectors.toMap( + agentTopicName -> agentTopicName, + agentTopicName -> Boolean.FALSE + )); + + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息 + buildAndSendAgentAliveOctopusMessage(currentTime); + + // 异步收集消息 + OctopusMessageAsyncReplayContend statusAsyncReplayContend = OctopusMessageAsyncReplayContend.build( + agentTopicNameList.size(), + CurrentAppOctopusMessageType, + currentTime + ); + asyncWaitOctopusMessageResultService.waitFor(statusAsyncReplayContend); + + // 解析结果 + CountDownLatch countDownLatch = statusAsyncReplayContend.getCountDownLatch(); + + // 等待状态返回的结果 + boolean agentAliveStatusCollectResult = false; + try { + agentAliveStatusCollectResult = countDownLatch.await( + aliveStatusWaitMaxTime, + TimeUnit.SECONDS + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (!agentAliveStatusCollectResult) { + log.debug("Agent存活状态检查,没有检查到全部的Agent!"); + } + + // 移除等待队列 + asyncWaitOctopusMessageResultService.stopWaiting(statusAsyncReplayContend); + + // 处理结果 + statusAsyncReplayContend + .getReplayOMList() + .stream() + .forEach( + statusOMessage -> { + if (statusOMessage.getResult() != null) { + agentAliveStatusMap.put( + statusOMessage.getUuid(), + Boolean.TRUE + ); + } + } + ); + } + + // 返回Agent的存活状态内容 + return agentAliveStatusMap; + } + + private void buildAndSendAgentAliveOctopusMessage(LocalDateTime currentTime) { + + List octopusStatusMessageList = ALL_AGENT_TOPIC_NAME_LIST + .stream() + .map( + agentTopicName -> ConstructAgentStatusMessage( + HEALTHY_STATUS_MESSAGE_TYPE, + agentTopicName, + currentTime + ) + ) + .collect(Collectors.toList()); + + // 发送信息 + oMessageToAgentSender.send(octopusStatusMessageList); + + } + + private OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) { + + OctopusStatusMessage statusMessage = OctopusStatusMessage + .builder() + .statusType(statusType) + .build(); + + String ops; + try { + ops = objectMapper.writeValueAsString(statusMessage); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return OctopusMessage + .builder() + .type(CurrentAppOctopusMessageType) + .uuid(agentTopicName) + .init_time(currentTime) + .content(ops) + .build(); + + } +}