From fd5f2607b9b412d3649791f27144a41782607602 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 3 Feb 2023 11:30:20 +0800 Subject: [PATCH] [ server ] [ status ]- update some code - 1 --- .github/workflows/build-push-docker.yml | 32 +-- .../beans/status/AgentHealthyStatusEnum.java | 2 - .../beans/status/OctopusStatusMessage.java | 2 +- .../rpc/controller/SchedulerController.java | 18 +- .../wdd/rpc/controller/StatusController.java | 13 +- .../service/CoreExecutionServiceImpl.java | 13 +- .../service/ExecutionResultDaemonHandler.java | 40 +++- .../wdd/rpc/init/ServerBootUpEnvironment.java | 196 +++++++++++++++++ .../status/AgentRuntimeMetricStatus.java | 21 +- .../service/status/MonitorAllAgentStatus.java | 208 ++++-------------- .../io/wdd/server/ServerApplicationTests.java | 4 - .../test/java/io/wdd/server/SimpleTest.java | 36 +++ 12 files changed, 362 insertions(+), 223 deletions(-) create mode 100644 server/src/main/java/io/wdd/rpc/init/ServerBootUpEnvironment.java diff --git a/.github/workflows/build-push-docker.yml b/.github/workflows/build-push-docker.yml index 3d3b7c8..69e3b68 100644 --- a/.github/workflows/build-push-docker.yml +++ b/.github/workflows/build-push-docker.yml @@ -188,20 +188,20 @@ jobs: push: true tags: ${{ steps.docker_tags_server.outputs.tags }} - - name: Build and push Docker images - [ Agent ] - id: docker_build_agent - # You may pin to the exact commit or the version. - # uses: docker/build-push-action@c56af957549030174b10d6867f20e78cfd7debc5 - uses: docker/build-push-action@v3.2.0 - with: - context: ./ - # Path to the Dockerfile - file: ./agent/Dockerfile - # List of target platforms for build - platforms: linux/amd64,linux/arm64 - # Always attempt to pull all referenced images - pull: false - # Push is a shorthand for --output=type=registry - push: true - tags: ${{ steps.docker_tags_agent.outputs.tags }} +# - name: Build and push Docker images - [ Agent ] +# id: docker_build_agent +# # You may pin to the exact commit or the version. +# # uses: docker/build-push-action@c56af957549030174b10d6867f20e78cfd7debc5 +# uses: docker/build-push-action@v3.2.0 +# with: +# context: ./ +# # Path to the Dockerfile +# file: ./agent/Dockerfile +# # List of target platforms for build +# platforms: linux/amd64,linux/arm64 +# # Always attempt to pull all referenced images +# pull: false +# # Push is a shorthand for --output=type=registry +# push: true +# tags: ${{ steps.docker_tags_agent.outputs.tags }} diff --git a/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java b/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java index 2a1cd98..0494909 100644 --- a/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java +++ b/common/src/main/java/io/wdd/common/beans/status/AgentHealthyStatusEnum.java @@ -18,11 +18,9 @@ public enum AgentHealthyStatusEnum { String status; - String description; - AgentHealthyStatusEnum(String status, String description) { this.description = description; this.status = status; diff --git a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java index bc4cf45..fb1c677 100644 --- a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java +++ b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java @@ -12,7 +12,7 @@ import lombok.experimental.SuperBuilder; public class OctopusStatusMessage { // below two will be used by both server and agent - public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; + public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_HEALTHY_STATUS"; public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; public static final String ALL_STATUS_MESSAGE_TYPE = "all"; public static final String METRIC_STATUS_MESSAGE_TYPE = "metric"; diff --git a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java index d98523e..15cccfe 100644 --- a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java +++ b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java @@ -15,8 +15,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET; - @RestController @Api(value = "定时任务控制中心的Controller") @RequestMapping(value = "/octopus/server/scheduler") @@ -38,18 +36,27 @@ public class SchedulerController { @ApiParam(name = "scheduleScript") @RequestBody() ScriptSchedulerVO scriptSchedulerVO ) { - ALL_AGENT_TOPIC_NAME_SET.add("Chengdu-amd64-98-98066f"); - HashMap map = octopusQuartzService.createScriptScheduledMission(scriptSchedulerVO); return R.ok(map); } + @ApiOperation(value = "查询所有的定时脚本任务") + @GetMapping(value = "/script/getAll") + public R> getAllScriptScheduler( + + ) { + + + return R.ok(null); + + } + /** * -------------------------------------------------------------- * 普通的定时任务查询功能 - * */ + */ @ApiOperation(value = "查询所有mission") @GetMapping(value = "/queryAllMission") @@ -68,7 +75,6 @@ public class SchedulerController { } - @ApiOperation(value = "删除一个mission") @PostMapping(value = "/deleteMission/") public R deleteMission( diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index 4379964..a84bd9e 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -10,8 +10,9 @@ import org.springframework.web.bind.annotation.RestController; import java.util.Map; -import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_HEALTHY_STATUS_MAP; -import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.HEALTHY_STATUS_AGENT_LIST_MAP; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_STATUS_MAP; +import static io.wdd.rpc.init.ServerBootUpEnvironment.STATUS_AGENT_LIST_MAP; + @RestController @Api("Agent运行状态Controller") @@ -21,16 +22,16 @@ public class StatusController { @ApiOperation("获取所有Agent的运行状态") @GetMapping("/agentStatus") - public R GetAllAgentHealthyStatus(){ + public R GetAllAgentHealthyStatus() { - return R.ok(ALL_AGENT_HEALTHY_STATUS_MAP); + return R.ok(ALL_AGENT_STATUS_MAP); } @ApiOperation("获取 状态-Agent Map") @GetMapping("/statusAgent") - public R GetHealthyStatusAgentList(){ + public R GetHealthyStatusAgentList() { - return R.ok(HEALTHY_STATUS_AGENT_LIST_MAP); + return R.ok(STATUS_AGENT_LIST_MAP); } } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 9e3a30c..c15d8ea 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -10,7 +10,6 @@ import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.message.sender.ToAgentMessageSender; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; @@ -21,7 +20,7 @@ import java.util.List; import java.util.stream.Collectors; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; -import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET; @Service @Slf4j @@ -71,7 +70,13 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { - return this.SendCommandToAgent(agentTopicName, type, commandList, commandListComplete, null); + return this.SendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + null + ); } @Override @@ -94,7 +99,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // 构造 Execution Command对应的消息体 ExecutionMessage executionMessage = this .generateExecutionMessage( - agentTopicName, + type, commandList, resultKey, commandListComplete diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java index a12eaf3..0d221f3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java @@ -75,8 +75,10 @@ public class ExecutionResultDaemonHandler { .keys() .nextElement(); - log.info("current result key is [{}]", - resultKey); + log.debug( + "current result key is [{}]", + resultKey + ); CompletableFuture> executionResultFuture = @@ -93,6 +95,7 @@ public class ExecutionResultDaemonHandler { // 构造 resultKey对应的 Redis Stream Listener Container buildStreamReader .buildStreamReader(commandReaderConfig); + // 获得结果 ArrayList s = new ArrayList<>( List.of("no no no") @@ -116,8 +119,11 @@ public class ExecutionResultDaemonHandler { } } ) - .get(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, - TimeUnit.SECONDS); + // 获取相应的结果 + .get( + MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, + TimeUnit.SECONDS + ); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { @@ -145,15 +151,19 @@ public class ExecutionResultDaemonHandler { // 获取结果,然后销毁Stream Listener Container CompletableFuture complete = CompletableFuture - .anyOf(falloutTimeFuture, - executionResultFuture); + .anyOf( + falloutTimeFuture, + executionResultFuture + ); complete .whenComplete( (result, e) -> { - log.info("execution result are => {}", - result); + log.debug( + "execution result are => {}", + result + ); // 持久化存储对应的结果 ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey); @@ -163,18 +173,24 @@ public class ExecutionResultDaemonHandler { CollectionUtils.isEmpty((Collection) result) ? 1 : 0 ); executionLog.setRecordId(commandReaderConfig.getRecordId()); - executionLogService.save(executionLog); + // 保存操作 + executionLogService.save(executionLog); + // 清除此次任务的内容 WAIT_EXECUTION_RESULT_LIST.remove(resultKey); - - log.info("[Execution] - whole process are complete !"); + log.info( + "[Execution] - command {} result are {} result code is {} ,whole process are complete !", + executionLog.getCommandList(), + executionLog.getResultContent(), + executionLog.getResultCode() + ); } ); // very important - // stuck the main thread , otherwise it will create a dead loop , really bad + // stuck the main thread , otherwise it will create a dead loop complete.join(); } diff --git a/server/src/main/java/io/wdd/rpc/init/ServerBootUpEnvironment.java b/server/src/main/java/io/wdd/rpc/init/ServerBootUpEnvironment.java new file mode 100644 index 0000000..cd66ccd --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/init/ServerBootUpEnvironment.java @@ -0,0 +1,196 @@ +package io.wdd.rpc.init; + + +import io.wdd.common.beans.status.AgentHealthyStatusEnum; +import io.wdd.common.utils.TimeUtils; +import io.wdd.server.beans.vo.ServerInfoVO; +import io.wdd.server.coreService.CoreServerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.*; +import java.util.stream.Collectors; + +import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; + +/** + * Server启动的时候,需要初始化一系列的信息 + *

+ * 所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET + *

+ * Agent状态信息的两个Map STATUS_AGENT_LIST_MAP ALL_AGENT_STATUS_MAP + */ +@Service +@Slf4j +public class ServerBootUpEnvironment { + + /** + * 存储所有的AgentTopicName的缓存 + */ + public static final Set ALL_AGENT_TOPIC_NAME_SET = new HashSet<>(); + + /** + * 存储所有的AgentTopicName的缓存 + */ + public static final List ALL_AGENT_TOPIC_NAME_LIST = new ArrayList<>(); + + + /** + * 存储 状态对应Agent列表的Map + * Agent的状态描述为 AgentHealthyStatusEnum + * HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] + * FAILED -> ["agentTopicName-1", "agentTopicName-2"] + */ + public static final Map> STATUS_AGENT_LIST_MAP = new HashMap<>(); + /** + * 存储所有Agent状态的Map + *

+ * 内容为 agentTopicName-健康状态 + */ + public static final Map ALL_AGENT_STATUS_MAP = new HashMap<>(); + + /** + * 保存所有健康运行的Agent Topic Name + */ + public static List ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; + + @Resource + CoreServerService coreServerService; + + @Resource + RedisTemplate redisTemplate; + + @PostConstruct + public void GenerateAllCache() { + + //所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET + updateAllAgentTopicNameSetCache(); + + // Agent状态信息的两个Map + updateAgentStatusMapCache(); + + } + + public void updateAllAgentTopicNameSetCache() { + + List allAgentInfo = coreServerService.serverGetAll(); + + Assert.notEmpty( + allAgentInfo, + "not agent registered ! skip the agent healthy status check !" + ); + + ALL_AGENT_TOPIC_NAME_LIST.clear(); + ALL_AGENT_TOPIC_NAME_SET.clear(); + + List collect = allAgentInfo + .stream() + .map(ServerInfoVO::getTopicName) + .collect(Collectors.toList()); + + + ALL_AGENT_TOPIC_NAME_LIST.addAll(collect); + ALL_AGENT_TOPIC_NAME_SET.addAll(collect); + + } + + + public void updateAgentStatusMapCache() { + + List statusList = redisTemplate + .opsForHash() + .multiGet( + ALL_AGENT_STATUS_REDIS_KEY, + ALL_AGENT_TOPIC_NAME_LIST + ); + // current log to console is ok + + // 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown) + HashMap agentStatusMap = new HashMap<>(32); + for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) { + agentStatusMap.put( + ALL_AGENT_TOPIC_NAME_LIST.get(i), + uniformHealthyStatus(String.valueOf(statusList.get(i))) + ); + } + + String currentTimeString = TimeUtils.currentTimeString(); + log.info( + "[ AGENT HEALTHY CHECK ] time is {} , result are => {}", + currentTimeString, + agentStatusMap + ); + + // 2023-01-16 + ALL_AGENT_STATUS_MAP.clear(); + ALL_AGENT_STATUS_MAP.putAll(agentStatusMap); + + // 2023-01-16 + // 更新 状态-Agent容器 内容为 + // HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] + // FAILED -> ["agentTopicName-1", "agentTopicName-2"] + Map> statusAgentListMap = agentStatusMap + .entrySet() + .stream() + .collect( + Collectors.groupingBy( + Map.Entry::getValue + ) + ) + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> entry + .getValue() + .stream() + .map( + Map.Entry::getKey + ) + .collect(Collectors.toList()) + ) + ); + + // 2023-2-3 bug fix + STATUS_AGENT_LIST_MAP.clear(); + STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); + log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了"); + + + // Trigger调用Agent Metric 任务 + ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); + for (int i = 0; i < statusList.size(); i++) { + if (statusList + .get(i) + .equals("1")) { + allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i)); + } + } + // 缓存相应的存活Agent + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear(); + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames); + + // help gc + agentStatusMap = null; + statusAgentListMap = null; + allHealthyAgentTopicNames = null; + } + + private String uniformHealthyStatus(String agentStatus) { + switch (agentStatus) { + case "0": + return AgentHealthyStatusEnum.FAILED.getStatus(); + case "1": + return AgentHealthyStatusEnum.HEALTHY.getStatus(); + default: + return AgentHealthyStatusEnum.UNKNOWN.getStatus(); + } + } + + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java index 444e616..b9cb421 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java @@ -11,6 +11,7 @@ import java.util.List; import java.util.stream.Collectors; import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; /** * 收集OctopusAgent的运行Metric信息 @@ -21,8 +22,6 @@ import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESS @Slf4j public class AgentRuntimeMetricStatus { - public static List ALL_HEALTHY_AGENT_TOPIC_NAMES; - public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch"; public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount"; @@ -32,29 +31,35 @@ public class AgentRuntimeMetricStatus { public void collect(int metricRepeatCount, int metricRepeatPinch) { // 检查基础信息 - if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAMES)) { + if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) { log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES"); } // 构建 OctopusMessage - // 只发送一次消息,让Agent循环定时执行任务 - buildMetricStatusMessageAndSend(metricRepeatCount, metricRepeatPinch); + // 只发送一次消息,让Agent循环定时执行任务 + buildMetricStatusMessageAndSend( + metricRepeatCount, + metricRepeatPinch + ); // } private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) { - List collect = ALL_HEALTHY_AGENT_TOPIC_NAMES.stream() + List collect = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST + .stream() .map( agentTopicName -> { - return OctopusStatusMessage.builder() + return OctopusStatusMessage + .builder() .type(METRIC_STATUS_MESSAGE_TYPE) .metricRepeatCount(metricRepeatCount) .metricRepeatPinch(metricRepeatPinch) .agentTopicName(agentTopicName) .build(); } - ).collect(Collectors.toList()); + ) + .collect(Collectors.toList()); // send to the next level collectAgentStatus.statusMessageToAgent(collect); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java index 222db4d..4af407d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java @@ -3,22 +3,24 @@ package io.wdd.rpc.scheduler.service.status; import io.wdd.common.beans.status.AgentHealthyStatusEnum; import io.wdd.common.beans.status.OctopusStatusMessage; import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.init.ServerBootUpEnvironment; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; -import io.wdd.server.beans.vo.ServerInfoVO; -import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; -import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.util.*; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; -import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.ServerBootUpEnvironment.STATUS_AGENT_LIST_MAP; /** * 更新频率被类 BuildStatusScheduleTask.class控制 @@ -36,58 +38,27 @@ import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.ALL_H */ @Service @Slf4j +@Lazy public class MonitorAllAgentStatus { - /** - * 存储 状态对应Agent列表的Map - * Agent的状态描述为 AgentHealthyStatusEnum - * HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] - * FAILED -> ["agentTopicName-1", "agentTopicName-2"] - */ - public static final Map> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>(); - /** - * 存储所有Agent状态的Map - *

- * 内容为 agentTopicName-健康状态 - */ - public static final Map ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>(); - /** - * 存储所有的AgentTopicName的缓存 - */ - public static final Set ALL_AGENT_TOPIC_NAME_SET = new HashSet<>(); private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource RedisTemplate redisTemplate; @Resource CollectAgentStatus collectAgentStatus; + @Resource - CoreServerService coreServerService; + ServerBootUpEnvironment serverBootUpEnvironment; + @Resource BuildStatusScheduleTask buildStatusScheduleTask; - private List ALL_AGENT_TOPIC_NAME_LIST; + private HashMap AGENT_HEALTHY_INIT_MAP; public void go() { try { - - // 1. 获取所有注册的Agent - // todo need to cache this - List allAgentInfo = coreServerService.serverGetAll(); - Assert.notEmpty( - allAgentInfo, - "not agent registered ! skip the agent healthy status check !" - ); - - ALL_AGENT_TOPIC_NAME_LIST = allAgentInfo - .stream() - .map(ServerInfoVO::getTopicName) - .collect(Collectors.toList()); - - // 2023-01-16 - ALL_AGENT_TOPIC_NAME_SET.clear(); - ALL_AGENT_TOPIC_NAME_SET.addAll(ALL_AGENT_TOPIC_NAME_LIST); - + // 1. 获取所有注册的Agent 手动更新 // 1.1 检查 Agent状态保存数据结构是否正常 checkOrCreateRedisHealthyKey(); @@ -108,39 +79,38 @@ public class MonitorAllAgentStatus { private void checkOrCreateRedisHealthyKey() { - // must init the cached map && make sure the redis key existed! - if (null == AGENT_HEALTHY_INIT_MAP || !redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) { - log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create"); + // 检查开始的时候 需要手动将所有Agent的状态置为0 + // Agent如果存活,那么就可以将其自身状态修改为1 - // build the redis all agent healthy map struct - HashMap initMap = new HashMap<>(32); - ALL_AGENT_TOPIC_NAME_LIST - .stream() - .forEach( - agentTopicName -> { - initMap.put( - agentTopicName, - "0" - ); - } - ); + // build the redis all agent healthy map struct + HashMap initMap = new HashMap<>(32); + ALL_AGENT_TOPIC_NAME_LIST + .stream() + .forEach( + agentTopicName -> { + initMap.put( + agentTopicName, + "0" + ); + } + ); - initMap.put( - "updateTime", - TimeUtils.currentTimeString() - ); + initMap.put( + "updateTime", + TimeUtils.currentTimeString() + ); - // cache this map struct - AGENT_HEALTHY_INIT_MAP = initMap; + // cache this map struct + AGENT_HEALTHY_INIT_MAP = initMap; + + // create the healthy redis structure + redisTemplate + .opsForHash() + .putAll( + ALL_AGENT_STATUS_REDIS_KEY, + initMap + ); - // create the healthy redis structure - redisTemplate - .opsForHash() - .putAll( - ALL_AGENT_STATUS_REDIS_KEY, - initMap - ); - } } private void buildAndSendAgentHealthMessage() { @@ -161,94 +131,14 @@ public class MonitorAllAgentStatus { private void updateAllAgentHealthyStatus() { - - List statusList = redisTemplate - .opsForHash() - .multiGet( - ALL_AGENT_STATUS_REDIS_KEY, - ALL_AGENT_TOPIC_NAME_LIST - ); - // current log to console is ok - - - // 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown) - HashMap agentStatusMap = new HashMap<>(32); - for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) { - agentStatusMap.put( - ALL_AGENT_TOPIC_NAME_LIST.get(i), - uniformHealthyStatus(String.valueOf(statusList.get(i))) - ); - } String currentTimeString = TimeUtils.currentTimeString(); - log.info( - "[ AGENT HEALTHY CHECK ] time is {} , result are => {}", - currentTimeString, - agentStatusMap - ); - // 2023-01-16 - ALL_AGENT_HEALTHY_STATUS_MAP.clear(); - ALL_AGENT_HEALTHY_STATUS_MAP.putAll(agentStatusMap); + // 更新所有的缓存状态 + serverBootUpEnvironment.updateAgentStatusMapCache(); - // 2023-01-16 - Map> statusAgentListMap = agentStatusMap - .entrySet() - .stream() - .collect( - Collectors.groupingBy( - Map.Entry::getValue - ) - ) - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> entry.getKey(), - entry -> entry - .getValue() - .stream() - .map( - Map.Entry::getKey - ) - .collect(Collectors.toList()) - ) - ); - HEALTHY_STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); - log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了"); - - - // help gc - agentStatusMap = null; - - // Trigger调用Agent Metric 任务 - ArrayList allHealthyAgentTopicNames = new ArrayList<>(32); - for (int i = 0; i < statusList.size(); i++) { - if (statusList - .get(i) - .equals("1")) { - allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i)); - } - } - // 缓存相应的存活Agent - ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames; - // 执行Metric上报任务 + // 执行Metric上报定时任务 buildStatusScheduleTask.buildAgentMetricScheduleTask(); - - - // init the healthy map - // 需要将所有的Agent的状态置为 "0" - ALL_AGENT_TOPIC_NAME_LIST - .stream() - .forEach( - agentTopicName -> { - AGENT_HEALTHY_INIT_MAP.put( - agentTopicName, - "0" - ); - } - ); - // update time AGENT_HEALTHY_INIT_MAP.put( "updateTime", @@ -261,17 +151,7 @@ public class MonitorAllAgentStatus { ALL_AGENT_STATUS_REDIS_KEY, AGENT_HEALTHY_INIT_MAP ); - } - private String uniformHealthyStatus(String agentStatus) { - switch (agentStatus) { - case "0": - return AgentHealthyStatusEnum.FAILED.getStatus(); - case "1": - return AgentHealthyStatusEnum.HEALTHY.getStatus(); - default: - return AgentHealthyStatusEnum.UNKNOWN.getStatus(); - } } diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index 6da1fa5..e78fc65 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -8,8 +8,6 @@ import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; -import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET; - @SpringBootTest class ServerApplicationTests { @@ -57,8 +55,6 @@ class ServerApplicationTests { completeScript.add(command4); - ALL_AGENT_TOPIC_NAME_SET.add("Chengdu-amd64-98-98066f"); - ArrayList targetMachineList = new ArrayList<>( List.of( "Chengdu-amd64-98-98066f" diff --git a/server/src/test/java/io/wdd/server/SimpleTest.java b/server/src/test/java/io/wdd/server/SimpleTest.java index 76a9bd6..383d551 100644 --- a/server/src/test/java/io/wdd/server/SimpleTest.java +++ b/server/src/test/java/io/wdd/server/SimpleTest.java @@ -1,12 +1,48 @@ package io.wdd.server; +import io.wdd.common.beans.status.AgentHealthyStatusEnum; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.concurrent.CompletableFuture; + public class SimpleTest { public static void main(String[] args) { + HashMap map = new HashMap<>(); + + HashMap> hashMap = new HashMap<>(); + + hashMap.put( + "HEALTHY", + new ArrayList<>( + List.of( + "Tokyo-amd64-07-f66a41", + "Tokyo-amd64-03-dc543f" + ) + ) + ); + + hashMap + .get(AgentHealthyStatusEnum.FAILED.getStatus()) + .stream() + .forEach( + agentTopicName -> { + map.put( + agentTopicName, + "0" + ); + } + ); + + + } + + private void CompletableFutureTest() { //任务1:洗水壶->烧开水 CompletableFuture f1 = CompletableFuture.supplyAsync(() -> { try {