From ec3d5bba1e569174c7547812232883ef6df5b9ee Mon Sep 17 00:00:00 2001 From: zeaslity Date: Thu, 15 Jun 2023 10:37:45 +0800 Subject: [PATCH] [Exec] modify sync and async execution function --- .../wdd/func/xray/service/XrayCallAgent.java | 6 +- .../rpc/agent/OctopusAgentServiceImpl.java | 4 +- .../rpc/controller/ExecutionController.java | 189 +++---- .../wdd/rpc/controller/StatusController.java | 8 +- .../service/AsyncExecutionService.java | 65 +-- .../service/AsyncExecutionServiceImpl.java | 466 +++++++----------- .../execute/service/SyncExecutionService.java | 65 ++- .../service/SyncExecutionServiceImpl.java | 448 ++++++++++------- ...atus.java => AgentStatusCacheService.java} | 2 +- .../message/handler/AsyncWaitOMResult.java | 14 +- .../scheduler/job/AgentStatusMonitorJob.java | 6 +- .../script/AgentApplyScheduledScript.java | 6 +- .../status/AgentRuntimeMetricStatus.java | 2 +- ...Status.java => CheckAgentAliveStatus.java} | 17 +- .../io/wdd/rpc/status/beans/AgentStatus.java | 8 +- .../beans/{CPUInfo.java => CPUStatus.java} | 4 +- .../beans/{DiskInfo.java => DiskStatus.java} | 2 +- .../{MemoryInfo.java => MemoryStatus.java} | 2 +- .../{NetworkInfo.java => NetworkStatus.java} | 2 +- .../io/wdd/server/ServerApplicationTests.java | 6 +- 20 files changed, 668 insertions(+), 654 deletions(-) rename server/src/main/java/io/wdd/rpc/init/{ServerCacheAgentStatus.java => AgentStatusCacheService.java} (99%) rename server/src/main/java/io/wdd/rpc/scheduler/service/status/{MonitorAllAgentStatus.java => CheckAgentAliveStatus.java} (88%) rename server/src/main/java/io/wdd/rpc/status/beans/{CPUInfo.java => CPUStatus.java} (97%) rename server/src/main/java/io/wdd/rpc/status/beans/{DiskInfo.java => DiskStatus.java} (96%) rename server/src/main/java/io/wdd/rpc/status/beans/{MemoryInfo.java => MemoryStatus.java} (94%) rename server/src/main/java/io/wdd/rpc/status/beans/{NetworkInfo.java => NetworkStatus.java} (96%) diff --git a/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java b/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java index 672c22f..90fece5 100644 --- a/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java +++ b/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java @@ -3,7 +3,7 @@ package io.wdd.func.xray.service; import io.wdd.common.utils.TimeUtils; import io.wdd.func.oss.config.OctopusObjectSummary; import io.wdd.func.xray.beans.node.ProxyNode; -import io.wdd.rpc.execute.service.AsyncExecutionService; +import io.wdd.rpc.execute.service.SyncExecutionService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -83,7 +83,7 @@ public class XrayCallAgent { } @Resource - AsyncExecutionService executionService; + SyncExecutionService executionService; /** * 为代理链的每一个节点 构建Xray配置更新命令,然后发送至对应的Agent中 @@ -131,7 +131,7 @@ public class XrayCallAgent { ); // 向Agent发送命令,执行更新操作! - String resultKey = executionService.SendCommandToAgent( + String resultKey = executionService.SyncSendCommandToAgent( proxyNode.getAgentTopicName(), updateCommandType, null, diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 59c0f25..beed801 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -26,8 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 71e00c0..7d29abf 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -15,11 +15,12 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Nullable; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @RestController @RequestMapping("/octopus/server/executor") @@ -27,11 +28,11 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAM public class ExecutionController { @Resource - AsyncExecutionService asyncExecutionService; + SyncExecutionService syncExecutionService; @Resource BuildStreamReader buildStreamReader; @Resource - SyncExecutionService syncExecutionService; + AsyncExecutionService asyncExecutionService; @PostMapping("/command/one") @ApiOperation("[命令] [异步]- 单台主机") @@ -44,8 +45,8 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - String streamKey = asyncExecutionService - .SendCommandToAgent( + ArrayList streamKeyList = asyncExecutionService + .AsyncSendCommandToAgentComplete( topicName, type, commandList, @@ -55,12 +56,13 @@ public class ExecutionController { isDurationTask ); - return R.ok(streamKey); + + return R.ok(streamKeyList.toString()); } @PostMapping("/command/batch") @ApiOperation("[命令] [异步] - 批量主机") - public R> patchCommandToAgentList( + public R>> patchCommandToAgentList( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @RequestParam(value = "commandList", required = false) @@ -71,19 +73,20 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok(asyncExecutionService.SendCommandToAgentComplete( + List> arrayListList = asyncExecutionService.AsyncSendCommandToAgentComplete( topicNameList, type, commandList, completeCommandList, isDurationTask - )); + ); + return R.ok(arrayListList); } @PostMapping("/command/all") @ApiOperation("[命令] [异步] - 所有的主机") - public R> patchCommandToAllAgent( + public R>> patchCommandToAllAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "completeCommandList", required = false) @@ -92,7 +95,7 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok(asyncExecutionService.SendCommandToAgentComplete( + return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete( ALL_AGENT_TOPIC_NAME_LIST, type, commandList, @@ -103,7 +106,7 @@ public class ExecutionController { @PostMapping("/command/healthy") @ApiOperation("[命令] [异步] - 健康的主机") - public R> patchCommandToHealthyAgent( + public R>> patchCommandToHealthyAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "completeCommandList", required = false) @@ -112,7 +115,7 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok(asyncExecutionService.SendCommandToAgentComplete( + return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, type, commandList, @@ -133,18 +136,18 @@ public class ExecutionController { ) { return R.ok( - syncExecutionService.SyncSendCommandToAgent( + Collections.singletonList(syncExecutionService.SyncSendCommandToAgentComplete( topicName, type, commandList, completeCommandList - ) + )) ); } @PostMapping("/command/sync/batch") @ApiOperation("[命令] [同步] - 批量-等待命令结果") - public R>> SyncPatchCommandToAgentBatch( + public R> SyncPatchCommandToAgentBatch( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @RequestParam(value = "commandList", required = false) @@ -168,7 +171,7 @@ public class ExecutionController { @PostMapping("/command/sync/all") @ApiOperation("[命令] [同步] - 全部-同步等待命令结果") - public R>> SyncPatchCommandToAgentAll( + public R> SyncPatchCommandToAgentAll( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "completeCommandList", required = false) @@ -206,81 +209,81 @@ public class ExecutionController { // auth required - @PostMapping("/function/update") - @ApiOperation("升级") - public R> AgentUpdate( - @RequestParam(value = "topicNameList") - @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList - ) { - - return R.ok( - asyncExecutionService - .SendCommandToAgent( - topicNameList, - "AgentUpdate", - null, - false, - null, - true - )); - } - - @PostMapping("/function/reboot") - @ApiOperation("重启") - public R> AgentReboot( - @RequestParam(value = "topicNameList") - @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList - ) { - - return R.ok( - asyncExecutionService - .SendCommandToAgent( - topicNameList, - "AgentReboot", - null, - false, - null, - true - )); - } - - @PostMapping("/function/shutdown") - @ApiOperation("关闭") - public R> AgentShutdown( - @RequestParam(value = "topicNameList") - @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList - ) { - - return R.ok( - asyncExecutionService - .SendCommandToAgent( - topicNameList, - "AgentShutdown", - null, - false, - null, - true - )); - } - - @PostMapping("/function/bootUp") - @ApiOperation("重新部署") - public R> AgentBootUp( - @RequestParam(value = "topicNameList") - @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList - ) { - - return R.ok( - asyncExecutionService - .SendCommandToAgent( - topicNameList, - "AgentBootUp", - null, - false, - null, - true - )); - } +// @PostMapping("/function/update") +// @ApiOperation("升级") +// public R> AgentUpdate( +// @RequestParam(value = "topicNameList") +// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList +// ) { +// +// return R.ok( +// syncExecutionService +// .SyncSendCommandToAgent( +// topicNameList, +// "AgentUpdate", +// null, +// false, +// null, +// true +// )); +// } +// +// @PostMapping("/function/reboot") +// @ApiOperation("重启") +// public R> AgentReboot( +// @RequestParam(value = "topicNameList") +// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList +// ) { +// +// return R.ok( +// asyncExecutionService +// .SyncSendCommandToAgent( +// topicNameList, +// "AgentReboot", +// null, +// false, +// null, +// true +// )); +// } +// +// @PostMapping("/function/shutdown") +// @ApiOperation("关闭") +// public R> AgentShutdown( +// @RequestParam(value = "topicNameList") +// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList +// ) { +// +// return R.ok( +// syncExecutionService +// .SyncSendCommandToAgent( +// topicNameList, +// "AgentShutdown", +// null, +// false, +// null, +// true +// )); +// } +// +// @PostMapping("/function/bootUp") +// @ApiOperation("重新部署") +// public R> AgentBootUp( +// @RequestParam(value = "topicNameList") +// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList +// ) { +// +// return R.ok( +// asyncExecutionService +// .SyncSendCommandToAgent( +// topicNameList, +// "AgentBootUp", +// null, +// false, +// null, +// true +// )); +// } } diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index d4190dc..cffe3df 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -4,7 +4,7 @@ package io.wdd.rpc.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.wdd.common.response.R; -import io.wdd.rpc.init.ServerCacheAgentStatus; +import io.wdd.rpc.init.AgentStatusCacheService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -14,7 +14,7 @@ import javax.annotation.Resource; import java.util.List; import java.util.Map; -import static io.wdd.rpc.init.ServerCacheAgentStatus.*; +import static io.wdd.rpc.init.AgentStatusCacheService.*; @RestController @@ -23,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.*; public class StatusController { @Resource - ServerCacheAgentStatus serverCacheAgentStatus; + AgentStatusCacheService agentStatusCacheService; @ApiOperation("[ Agent-状态 ] Map") @GetMapping("/agent/status") @@ -76,7 +76,7 @@ public class StatusController { public R>> ManualUpdateAgentStatus() { // 手动调用更新 - serverCacheAgentStatus.updateAgentStatusMapCache(); + agentStatusCacheService.updateAgentStatusMapCache(); return R.ok(STATUS_AGENT_LIST_MAP); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index 5c831cc..48dbf4c 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -1,20 +1,23 @@ package io.wdd.rpc.execute.service; -import io.wdd.rpc.message.OctopusMessage; - +import java.util.ArrayList; import java.util.HashMap; import java.util.List; - +/** + * 同步命令执行的核心类 + * 需要等待命令执行完毕,完后返回相应的结果 + */ public interface AsyncExecutionService { - String SendCommandToAgent(String agentTopicName, String command); + /** + * ------------------------ Sync Command Executor ------------------------------ + */ + ArrayList AsyncSendCommandToAgent(String agentTopicName, List commandList); - String SendCommandToAgent(String agentTopicName, List commandList); + ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList); - String SendCommandToAgent(String agentTopicName, String type, List commandList); - - List SendCommandToAgent(List agentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask); + List> AsyncSendCommandToAgent(List agentTopicNameList, String type, List commandList); /** * 调用 单行命令脚本的 最底层函数 @@ -27,7 +30,7 @@ public interface AsyncExecutionService { * @param durationTask * @return */ - String SendCommandToAgent( + ArrayList AsyncSendCommandToAgent( String agentTopicName, String type, List commandList, @@ -41,14 +44,21 @@ public interface AsyncExecutionService { * ------------------------------------------------- */ - String SendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete); + ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> completeCommandList); + List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask); - List SendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask); + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); - List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); - /** * 通常为 页面定时脚本任务调用 * @@ -58,10 +68,10 @@ public interface AsyncExecutionService { * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey */ - List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); - String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); /** * 调用 完整脚本的 最底层函数 @@ -74,30 +84,7 @@ public interface AsyncExecutionService { * @param durationTask * @return resultKey 本次操作在Redis中记录的结果Key */ - String SendCommandToAgent( - String agentTopicName, - String type, - List commandList, - List> commandListComplete, - boolean needResultReplay, - String futureKey, - boolean durationTask - ); - - - /** - * 同步命令调用的方法 - * - * @param agentTopicName - * @param type - * @param commandList - * @param commandListComplete - * @param needResultReplay - * @param futureKey - * @param durationTask - * @return - */ - OctopusMessage AsyncCallSendCommandToAgent( + ArrayList AsyncSendCommandToAgentComplete( String agentTopicName, String type, List commandList, diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index f2cc84a..a3686f6 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -1,108 +1,124 @@ package io.wdd.rpc.execute.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.execute.ExecutionMessage; -import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.rpc.message.handler.AsyncWaitOMResult; +import io.wdd.rpc.message.handler.OMReplayContend; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; - @Service @Slf4j public class AsyncExecutionServiceImpl implements AsyncExecutionService { - private static final String MANUAL_COMMAND_TYPE = "manual-command"; + private static final boolean COMMAND_EXEC_NEED_REPLAY = true; + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; @Resource - OMessageToAgentSender oMessageToAgentSender; + AsyncWaitOMResult asyncWaitOMResult; @Resource - ObjectMapper objectMapper; - @Resource - RedisTemplate redisTemplate; + SyncExecutionService asyncExecutionService; + + /** + * 一个命令执行的最长等待时间 + */ + int processMaxWaitSeconds = 10; @Override - public String SendCommandToAgent(String agentTopicName, String command) { - return this.SendCommandToAgent( - agentTopicName, - List.of(command) - ); - } + public ArrayList AsyncSendCommandToAgent(String agentTopicName, List commandList) { - @Override - public String SendCommandToAgent(String agentTopicName, List commandList) { - return this.SendCommandToAgent( + return this.AsyncSendCommandToAgentComplete( agentTopicName, - MANUAL_COMMAND_TYPE, - commandList - ); - } - - @Override - public String SendCommandToAgent(String agentTopicName, String type, List commandList) { - - return SendCommandToAgent( - agentTopicName, - type, - commandList, - false, null, - false - ); - - } - - @Override - public String SendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { - - return this.SendCommandToAgent( - agentTopicName, - type, commandList, null, - needResultReplay, - futureKey, - durationTask - ); - } - - @Override - public String SendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete) { - - return this.SendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - false, + COMMAND_EXEC_NEED_REPLAY, null, false ); } @Override - public List SendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask) { + public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList) { + + + return this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> AsyncSendCommandToAgent(List agentTopicNameList, String type, List commandList) { + return agentTopicNameList .stream() .map( - agentTopicName -> this.SendCommandToAgent( + agentTopicName -> this.AsyncSendCommandToAgentComplete( agentTopicName, type, commandList, - commandListComplete, - false, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + } + + @Override + public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + + return this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } + + @Override + public ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> completeCommandList) { + return this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + commandList, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + commandList, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, null, isDurationTask ) @@ -111,31 +127,60 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } @Override - public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { - return this.SendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - false, - futureKey, - false - ); + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); } @Override - public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + atnFutureKey.get(agentTopicName), + false + ) + ) + .collect(Collectors.toList()); + } - String resultKey = futureKey; - // 判定是否是 FutureKey - if (null == futureKey) { - resultKey = ExecutionMessage.GetResultKey(agentTopicName); - } + @Override + public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + return this.AsyncSendCommandToAgentComplete( + agentTopicName, + type, + commandList, + commandListComplete, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } - // 调用最底层的方法 - this.AsyncCallSendCommandToAgent( + @Override + public ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + + OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( agentTopicName, type, commandList, @@ -145,225 +190,66 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { durationTask ); - return resultKey; - } + LocalDateTime initTime = octopusMessage.getInit_time(); - @Override - public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + ArrayList result = new ArrayList<>(); - // 检查agentTopicName是否存在 - if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { - log.error( - "agentTopicName异常! 输入为 => {}", - agentTopicName + // 构造消息等待对象 + int commandCount = 1; + if (null != commandListComplete) { + commandCount = Math.max( + commandListComplete.size(), + 1 ); - return null; - //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); } - // 归一化type - if (StringUtils.isEmpty(type)) { - type = MANUAL_COMMAND_TYPE; - } - String resultKey = futureKey; - // 判定是否是 FutureKey - if (null == futureKey) { - resultKey = ExecutionMessage.GetResultKey(agentTopicName); - } - - // 构造 Execution Command对应的消息体 - ExecutionMessage executionMessage = this - .generateExecutionMessage( - type, - commandList, - resultKey, - commandListComplete, - needResultReplay, - durationTask - ); - OctopusMessage octopusMessage = this.generateOctopusMessage( - agentTopicName, - executionMessage + OMReplayContend omReplayContend = OMReplayContend.build( + commandCount, + CurrentAppOctopusMessageType, + initTime ); + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); - // send the message - oMessageToAgentSender.send(octopusMessage); - - // set up the stream read group - String group = redisTemplate - .opsForStream() - .createGroup( - resultKey, - resultKey - ); - - log.debug( - "set consumer group [{}] for the stream key with => [ {} ]", - group, - resultKey - ); - - // change the redis stream listener container - // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); - - // construct the persistent Bean - /*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( - octopusMessage, - executionMessage - );*/ - // send resultKey to ExecutionResultDaemonHandler - // 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效 - /*WAIT_EXECUTION_RESULT_LIST.put( - resultKey, - executionLog - );*/ - - // help gc - executionMessage = null; - - return octopusMessage; - } - - private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { + // 开始等待结果 + asyncWaitOMResult.waitFor(omReplayContend); + // 监听结果 try { + boolean await = countDownLatch.await( + processMaxWaitSeconds, + TimeUnit.SECONDS + ); - return OctopusMessage - .builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(TimeUtils.currentFormatTime()) - .uuid(agentTopicName) - .content( - objectMapper.writeValueAsString(executionMessage) + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + // 等待所有的结果返回 + // 停止等待结果 + asyncWaitOMResult.stopWaiting(omReplayContend); + + // 解析结果 + omReplayContend + .getReplayOMList() + .stream() + .map( + om -> { + log.debug( + "replay message is => {}", + om + ); + + return (ArrayList) om.getResult(); + } ) - .build(); + .forEachOrdered( + singleResult -> result.addAll(singleResult) + ); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); } + // 返回 + return result; } - - private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { - ExecutionLog executionLog = new ExecutionLog(); - executionLog.setAgentTopicName(octopusMessage.getUuid()); - executionLog.setResultKey((String) octopusMessage.getContent()); - executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand())); - executionLog.setType(executionMessage.getType()); - executionLog.setResultKey(executionMessage.getResultKey()); - return executionLog; - } - - - @Override - public List SendCommandToAgent(List agentagentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { - - return agentagentTopicNameList - .stream() - .map( - agentTopicName -> this - .SendCommandToAgent - ( - agentTopicName, - type, - commandList, - null, - needResultReplay, - futureKey, - durationTask - ) - ) - .collect(Collectors.toList()); - } - - /** - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @return - */ - @Override - public List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { - - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SendCommandToAgentComplete( - agentTopicName, - type, - null, - completeCommandList - ) - ) - .collect(Collectors.toList()); - - } - - @Override - public List SendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { - - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SendCommandToAgent( - agentTopicName, - type, - null, - completeCommandList, - atnFutureKey.getOrDefault( - agentTopicName, - null - ) - ) - ) - .collect(Collectors.toList()); - } - - - @Deprecated - private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { - - - ExecutionMessage executionMessage = this.generateExecutionMessage( - type, - commandList, - resultKey, - commandListComplete, - false, - false - ); - - String executionMessageString; - - try { - executionMessageString = objectMapper.writeValueAsString(executionMessage); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return OctopusMessage - .builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) - .content(executionMessageString) - .uuid(agentTopicName) - .build(); - } - - private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { - - return ExecutionMessage - .builder() - .resultKey(resultKey) - .type(type) - .singleLineCommand(commandList) - .multiLineCommand(commandListComplete) - .needResultReplay(needResultReplay) - .durationTask(durationTask) - .build(); - } - - } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java index ed1548d..fc52137 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java @@ -1,23 +1,20 @@ package io.wdd.rpc.execute.service; -import java.util.ArrayList; +import io.wdd.rpc.message.OctopusMessage; + import java.util.HashMap; import java.util.List; -/** - * 同步命令执行的核心类 - * 需要等待命令执行完毕,完后返回相应的结果 - */ + public interface SyncExecutionService { - /** - * ------------------------ Sync Command Executor ------------------------------ - */ - ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList); + String SyncSendCommandToAgent(String agentTopicName, String command); - ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList); + String SyncSendCommandToAgent(String agentTopicName, List commandList); - List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList); + String SyncSendCommandToAgent(String agentTopicName, String type, List commandList); + + List SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask); /** * 调用 单行命令脚本的 最底层函数 @@ -30,7 +27,7 @@ public interface SyncExecutionService { * @param durationTask * @return */ - ArrayList SyncSendCommandToAgent( + String SyncSendCommandToAgent( String agentTopicName, String type, List commandList, @@ -44,21 +41,14 @@ public interface SyncExecutionService { * ------------------------------------------------- */ - ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> completeCommandList); + String SyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete); - List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask); - /** - * 通常为 页面定时脚本任务调用 - * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey - */ - List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask); + List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + /** * 通常为 页面定时脚本任务调用 * @@ -68,10 +58,10 @@ public interface SyncExecutionService { * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey */ - List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); - ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); /** * 调用 完整脚本的 最底层函数 @@ -84,7 +74,30 @@ public interface SyncExecutionService { * @param durationTask * @return resultKey 本次操作在Redis中记录的结果Key */ - ArrayList SyncSendCommandToAgent( + String SyncSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + + /** + * 同步命令调用的方法 + * + * @param agentTopicName + * @param type + * @param commandList + * @param commandListComplete + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + OctopusMessage AsyncCallSendCommandToAgent( String agentTopicName, String type, List commandList, diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index 0ef3060..8b18c60 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -1,115 +1,100 @@ package io.wdd.rpc.execute.service; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.execute.ExecutionMessage; +import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.AsyncWaitOMResult; -import io.wdd.rpc.message.handler.OMReplayContend; +import io.wdd.rpc.message.sender.OMessageToAgentSender; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; + + @Service @Slf4j public class SyncExecutionServiceImpl implements SyncExecutionService { - private static final boolean COMMAND_EXEC_NEED_REPLAY = true; - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; + private static final String MANUAL_COMMAND_TYPE = "manual-command"; @Resource - AsyncWaitOMResult asyncWaitOMResult; + OMessageToAgentSender oMessageToAgentSender; @Resource - AsyncExecutionService asyncExecutionService; - - /** - * 一个命令执行的最长等待时间 - */ - int processMaxWaitSeconds = 10; + ObjectMapper objectMapper; + @Resource + RedisTemplate redisTemplate; @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList) { - + public String SyncSendCommandToAgent(String agentTopicName, String command) { return this.SyncSendCommandToAgent( agentTopicName, - null, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - null, - false + List.of(command) ); } @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList) { + public String SyncSendCommandToAgent(String agentTopicName, List commandList) { + return this.SyncSendCommandToAgent( + agentTopicName, + MANUAL_COMMAND_TYPE, + commandList + ); + } + @Override + public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList) { + + return SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + false, + null, + false + ); + + } + + @Override + public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { return this.SyncSendCommandToAgent( agentTopicName, type, commandList, null, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ); - } - - @Override - public List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList) { - - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ) - ) - .collect(Collectors.toList()); - } - - @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { - - return this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, + needResultReplay, futureKey, - false + durationTask ); } @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> completeCommandList) { + public String SyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete) { + return this.SyncSendCommandToAgent( agentTopicName, type, commandList, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, + commandListComplete, + false, null, false ); } @Override - public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask) { + public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask) { return agentTopicNameList .stream() .map( @@ -117,8 +102,8 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { agentTopicName, type, commandList, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, + commandListComplete, + false, null, isDurationTask ) @@ -127,60 +112,31 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { } @Override - public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SyncSendCommandToAgent( - agentTopicName, - type, - null, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ) - ) - .collect(Collectors.toList()); - - } - - @Override - public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SyncSendCommandToAgent( - agentTopicName, - type, - null, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - atnFutureKey.get(agentTopicName), - false - ) - ) - .collect(Collectors.toList()); - } - - @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { return this.SyncSendCommandToAgent( agentTopicName, type, commandList, commandListComplete, - COMMAND_EXEC_NEED_REPLAY, + false, futureKey, false ); + } @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( + String resultKey = futureKey; + // 判定是否是 FutureKey + if (null == futureKey) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } + + // 调用最底层的方法 + this.AsyncCallSendCommandToAgent( agentTopicName, type, commandList, @@ -190,65 +146,225 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { durationTask ); - LocalDateTime initTime = octopusMessage.getInit_time(); - - ArrayList result = new ArrayList<>(); - - // 构造消息等待对象 - int commandCount = 1; - if (null != commandListComplete) { - commandCount = Math.max( - commandListComplete.size(), - 1 - ); - } - - OMReplayContend omReplayContend = OMReplayContend.build( - commandCount, - CurrentAppOctopusMessageType, - initTime - ); - CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); - - // 开始等待结果 - asyncWaitOMResult.waitFor(omReplayContend); - - // 监听结果 - try { - boolean await = countDownLatch.await( - processMaxWaitSeconds, - TimeUnit.SECONDS - ); - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - - // 等待所有的结果返回 - // 停止等待结果 - asyncWaitOMResult.stopWaiting(omReplayContend); - - // 解析结果 - omReplayContend - .getReplayOMList() - .stream() - .map( - om -> { - log.debug( - "replay message is => {}", - om - ); - - return (ArrayList) om.getResult(); - } - ) - .forEachOrdered( - singleResult -> result.addAll(singleResult) - ); - - } - - // 返回 - return result; + return resultKey; } + + @Override + public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + + // 检查agentTopicName是否存在 + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { + log.error( + "agentTopicName异常! 输入为 => {}", + agentTopicName + ); + return null; + //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); + } + + // 归一化type + if (StringUtils.isEmpty(type)) { + type = MANUAL_COMMAND_TYPE; + } + + String resultKey = futureKey; + // 判定是否是 FutureKey + if (null == futureKey) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } + + // 构造 Execution Command对应的消息体 + ExecutionMessage executionMessage = this + .generateExecutionMessage( + type, + commandList, + resultKey, + commandListComplete, + needResultReplay, + durationTask + ); + OctopusMessage octopusMessage = this.generateOctopusMessage( + agentTopicName, + executionMessage + ); + + // send the message + oMessageToAgentSender.send(octopusMessage); + + // set up the stream read group + String group = redisTemplate + .opsForStream() + .createGroup( + resultKey, + resultKey + ); + + log.debug( + "set consumer group [{}] for the stream key with => [ {} ]", + group, + resultKey + ); + + // change the redis stream listener container + // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); + + // construct the persistent Bean + /*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( + octopusMessage, + executionMessage + );*/ + // send resultKey to ExecutionResultDaemonHandler + // 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效 + /*WAIT_EXECUTION_RESULT_LIST.put( + resultKey, + executionLog + );*/ + + // help gc + executionMessage = null; + + return octopusMessage; + } + + private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { + + try { + + return OctopusMessage + .builder() + .type(OctopusMessageType.EXECUTOR) + .init_time(TimeUtils.currentFormatTime()) + .uuid(agentTopicName) + .content( + objectMapper.writeValueAsString(executionMessage) + ) + .build(); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + + private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { + ExecutionLog executionLog = new ExecutionLog(); + executionLog.setAgentTopicName(octopusMessage.getUuid()); + executionLog.setResultKey((String) octopusMessage.getContent()); + executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand())); + executionLog.setType(executionMessage.getType()); + executionLog.setResultKey(executionMessage.getResultKey()); + return executionLog; + } + + + @Override + public List SyncSendCommandToAgent(List agentagentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + + return agentagentTopicNameList + .stream() + .map( + agentTopicName -> this + .SyncSendCommandToAgent + ( + agentTopicName, + type, + commandList, + null, + needResultReplay, + futureKey, + durationTask + ) + ) + .collect(Collectors.toList()); + } + + /** + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return + */ + @Override + public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgentComplete( + agentTopicName, + type, + null, + completeCommandList + ) + ) + .collect(Collectors.toList()); + + } + + @Override + public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + atnFutureKey.getOrDefault( + agentTopicName, + null + ) + ) + ) + .collect(Collectors.toList()); + } + + + @Deprecated + private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { + + + ExecutionMessage executionMessage = this.generateExecutionMessage( + type, + commandList, + resultKey, + commandListComplete, + false, + false + ); + + String executionMessageString; + + try { + executionMessageString = objectMapper.writeValueAsString(executionMessage); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return OctopusMessage + .builder() + .type(OctopusMessageType.EXECUTOR) + .init_time(LocalDateTime.now()) + .content(executionMessageString) + .uuid(agentTopicName) + .build(); + } + + private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { + + return ExecutionMessage + .builder() + .resultKey(resultKey) + .type(type) + .singleLineCommand(commandList) + .multiLineCommand(commandListComplete) + .needResultReplay(needResultReplay) + .durationTask(durationTask) + .build(); + } + + } diff --git a/server/src/main/java/io/wdd/rpc/init/ServerCacheAgentStatus.java b/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java similarity index 99% rename from server/src/main/java/io/wdd/rpc/init/ServerCacheAgentStatus.java rename to server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java index fac1296..7cf3039 100644 --- a/server/src/main/java/io/wdd/rpc/init/ServerCacheAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java @@ -27,7 +27,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; */ @Service @Slf4j -public class ServerCacheAgentStatus { +public class AgentStatusCacheService { /** * 存储所有的AgentTopicName的缓存 diff --git a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java index 7f88542..9ac3d53 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java @@ -27,12 +27,12 @@ public class AsyncWaitOMResult { * KEY -> replayMatchKey * VALUE -> OMReplayContend - 包含countDownLatch 和 result */ - private static final HashMap REPLAY_WAITING_TARGET = new HashMap<>(); + private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); public void waitFor(OMReplayContend omReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key - REPLAY_WAITING_TARGET.put( + OM_REPLAY_WAITING_TARGET_MAP.put( omReplayContend.getReplayMatchKey(), omReplayContend ); @@ -44,7 +44,7 @@ public class AsyncWaitOMResult { public void stopWaiting(OMReplayContend omReplayContend) { // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey()); + OM_REPLAY_WAITING_TARGET_MAP.remove(omReplayContend.getReplayMatchKey()); } @@ -86,16 +86,20 @@ public class AsyncWaitOMResult { replayOMessage.getType(), replayOMessage.getInit_time() ); - if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) { + if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) { // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // todo 错误的数据需要放置于某处 + log.debug( + "等待队列力没有该回复的结果key =>", + matchKey + ); continue; } // Map中包含有Key,那么放置进去 - OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey); + OMReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java index efa60c1..c20a183 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentStatusMonitorJob.java @@ -1,7 +1,7 @@ package io.wdd.rpc.scheduler.job; import io.wdd.rpc.scheduler.config.QuartzLogOperator; -import io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus; +import io.wdd.rpc.scheduler.service.status.CheckAgentAliveStatus; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean; @@ -11,7 +11,7 @@ import javax.annotation.Resource; public class AgentStatusMonitorJob extends QuartzJobBean { @Resource - MonitorAllAgentStatus monitorAllAgentStatus; + CheckAgentAliveStatus checkAgentAliveStatus; @Resource QuartzLogOperator quartzLogOperator; @@ -23,7 +23,7 @@ public class AgentStatusMonitorJob extends QuartzJobBean { //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // actually execute the monitor service - monitorAllAgentStatus.go(); + checkAgentAliveStatus.go(); // log to somewhere quartzLogOperator.save(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 3329193..48682bb 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -1,7 +1,7 @@ package io.wdd.rpc.scheduler.service.script; -import io.wdd.rpc.execute.service.AsyncExecutionService; +import io.wdd.rpc.execute.service.SyncExecutionService; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import lombok.extern.slf4j.Slf4j; @@ -20,7 +20,7 @@ import java.util.List; public class AgentApplyScheduledScript { @Resource - AsyncExecutionService asyncExecutionService; + SyncExecutionService asyncExecutionService; @Resource QuartzSchedulerUtils quartzSchedulerUtils; @@ -46,7 +46,7 @@ public class AgentApplyScheduledScript { // 发送命令到Agent中 List resultKeyList = asyncExecutionService - .SendCommandToAgentComplete( + .SyncSendCommandToAgentComplete( targetMachineList, scriptType, completeCommandList, diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java index 1ca87c6..40e13e0 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java @@ -10,7 +10,7 @@ import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; /** diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java similarity index 88% rename from server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java rename to server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java index 69385f6..ebe797e 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/MonitorAllAgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/CheckAgentAliveStatus.java @@ -1,7 +1,7 @@ package io.wdd.rpc.scheduler.service.status; import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.init.ServerCacheAgentStatus; +import io.wdd.rpc.init.AgentStatusCacheService; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.status.OctopusStatusMessage; import lombok.extern.slf4j.Slf4j; @@ -13,10 +13,11 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; @@ -37,7 +38,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE @Service @Slf4j @Lazy -public class MonitorAllAgentStatus { +public class CheckAgentAliveStatus { private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource @@ -46,7 +47,7 @@ public class MonitorAllAgentStatus { CollectAgentStatus collectAgentStatus; @Resource - ServerCacheAgentStatus serverCacheAgentStatus; + AgentStatusCacheService agentStatusCacheService; @Resource BuildStatusScheduleTask buildStatusScheduleTask; @@ -57,7 +58,7 @@ public class MonitorAllAgentStatus { try { // 1. 获取所有注册的Agent 手动更新 - serverCacheAgentStatus.updateAllAgentTopicNameCache(); + agentStatusCacheService.updateAllAgentTopicNameCache(); if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !"); return; @@ -67,6 +68,10 @@ public class MonitorAllAgentStatus { checkOrCreateRedisHealthyKey(); // 2.发送状态检查信息, agent需要update相应的HashMap的值 + // 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改 + CountDownLatch aliveStatusCDL = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size()); + + buildAndSendAgentHealthMessage(); // 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报 @@ -139,7 +144,7 @@ public class MonitorAllAgentStatus { String currentTimeString = TimeUtils.currentTimeString(); // 更新所有的缓存状态 - serverCacheAgentStatus.updateAgentStatusMapCache(); + agentStatusCacheService.updateAgentStatusMapCache(); // 执行Metric上报定时任务 buildStatusScheduleTask.buildAgentMetricScheduleTask(); diff --git a/server/src/main/java/io/wdd/rpc/status/beans/AgentStatus.java b/server/src/main/java/io/wdd/rpc/status/beans/AgentStatus.java index 4ab2767..c6bad73 100644 --- a/server/src/main/java/io/wdd/rpc/status/beans/AgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/beans/AgentStatus.java @@ -9,15 +9,15 @@ import lombok.NoArgsConstructor; public class AgentStatus { @JsonProperty("CPUStatus") - private CPUInfo cPUStatus; + private CPUStatus cpuStatus; @JsonProperty("MemoryStatus") - private MemoryInfo memoryStatus; + private MemoryStatus memoryStatus; @JsonProperty("NetworkStatus") - private NetworkInfo networkStatus; + private NetworkStatus networkStatus; @JsonProperty("DiskStatus") - private DiskInfo diskStatus; + private DiskStatus diskStatus; } diff --git a/server/src/main/java/io/wdd/rpc/status/beans/CPUInfo.java b/server/src/main/java/io/wdd/rpc/status/beans/CPUStatus.java similarity index 97% rename from server/src/main/java/io/wdd/rpc/status/beans/CPUInfo.java rename to server/src/main/java/io/wdd/rpc/status/beans/CPUStatus.java index 018bd04..add5821 100644 --- a/server/src/main/java/io/wdd/rpc/status/beans/CPUInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/beans/CPUStatus.java @@ -12,12 +12,12 @@ import java.util.List; @AllArgsConstructor @NoArgsConstructor @SuperBuilder(toBuilder = true) -public class CPUInfo { +public class CPUStatus { @JsonProperty("NumCores") private Integer numCores; - @JsonProperty("CPUInfo") + @JsonProperty("CPUStatus") private List cPUInfo; @JsonProperty("CPUPercent") private Double cPUPercent; diff --git a/server/src/main/java/io/wdd/rpc/status/beans/DiskInfo.java b/server/src/main/java/io/wdd/rpc/status/beans/DiskStatus.java similarity index 96% rename from server/src/main/java/io/wdd/rpc/status/beans/DiskInfo.java rename to server/src/main/java/io/wdd/rpc/status/beans/DiskStatus.java index 7b8c7de..2c728cf 100644 --- a/server/src/main/java/io/wdd/rpc/status/beans/DiskInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/beans/DiskStatus.java @@ -8,7 +8,7 @@ import java.util.List; @NoArgsConstructor @Data -public class DiskInfo { +public class DiskStatus { @JsonProperty("Total") private Long total; diff --git a/server/src/main/java/io/wdd/rpc/status/beans/MemoryInfo.java b/server/src/main/java/io/wdd/rpc/status/beans/MemoryStatus.java similarity index 94% rename from server/src/main/java/io/wdd/rpc/status/beans/MemoryInfo.java rename to server/src/main/java/io/wdd/rpc/status/beans/MemoryStatus.java index 15971af..c16e8ba 100644 --- a/server/src/main/java/io/wdd/rpc/status/beans/MemoryInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/beans/MemoryStatus.java @@ -6,7 +6,7 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @Data -public class MemoryInfo { +public class MemoryStatus { @JsonProperty("TotalMemory") private Long totalMemory; diff --git a/server/src/main/java/io/wdd/rpc/status/beans/NetworkInfo.java b/server/src/main/java/io/wdd/rpc/status/beans/NetworkStatus.java similarity index 96% rename from server/src/main/java/io/wdd/rpc/status/beans/NetworkInfo.java rename to server/src/main/java/io/wdd/rpc/status/beans/NetworkStatus.java index 9bee6cd..c5a067d 100644 --- a/server/src/main/java/io/wdd/rpc/status/beans/NetworkInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/beans/NetworkStatus.java @@ -12,7 +12,7 @@ import java.util.List; @Data @AllArgsConstructor @SuperBuilder(toBuilder = true) -public class NetworkInfo { +public class NetworkStatus { @JsonProperty("name") private String name; diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index 38cc180..bfdd25a 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -1,6 +1,6 @@ package io.wdd.server; -import io.wdd.rpc.execute.service.AsyncExecutionService; +import io.wdd.rpc.execute.service.SyncExecutionService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,7 +13,7 @@ class ServerApplicationTests { @Resource - AsyncExecutionService asyncExecutionService; + SyncExecutionService asyncExecutionService; @Test void testCoreExecutionCompleteScript() { @@ -61,7 +61,7 @@ class ServerApplicationTests { ) ); - List resultList = asyncExecutionService.SendCommandToAgentComplete( + List resultList = asyncExecutionService.SyncSendCommandToAgentComplete( targetMachineList, "Scheduled Script", completeScript