From c36721eada83a521fdbf96a68911f6aa6297874f Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 15:36:53 +0800 Subject: [PATCH] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E8=B0=83=E7=94=A8=E5=91=BD=E4=BB=A4=E7=9A=84?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/wdd/common/utils/TimeUtils.java | 2 +- .../xray/service/XrayConfigDistribute.java | 4 +- .../rpc/agent/OctopusAgentServiceImpl.java | 67 ++--- .../wdd/rpc/controller/AgentController.java | 4 +- .../rpc/controller/ExecutionController.java | 76 ++++-- ...ervice.java => AsyncExecutionService.java} | 33 ++- ...pl.java => AsyncExecutionServiceImpl.java} | 43 +++- .../execute/service/SyncExecutionService.java | 95 +++++++ .../service/SyncExecutionServiceImpl.java | 236 ++++++++++++++++++ .../message/handler/AsyncWaitOMResult.java | 25 +- .../rpc/message/handler/OMReplayContend.java | 29 ++- .../script/AgentApplyScheduledScript.java | 6 +- .../io/wdd/server/ServerApplicationTests.java | 6 +- 13 files changed, 535 insertions(+), 91 deletions(-) rename server/src/main/java/io/wdd/rpc/execute/service/{CoreExecutionService.java => AsyncExecutionService.java} (79%) rename server/src/main/java/io/wdd/rpc/execute/service/{CoreExecutionServiceImpl.java => AsyncExecutionServiceImpl.java} (91%) create mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java diff --git a/common/src/main/java/io/wdd/common/utils/TimeUtils.java b/common/src/main/java/io/wdd/common/utils/TimeUtils.java index f85627b..38d5e53 100644 --- a/common/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/common/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -117,7 +117,7 @@ public class TimeUtils { } /** - * @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String + * @return UTC+8 [ yyyy-MM-dd-HH-mm-ss ] Time String */ public static String currentTimeStringFullSplit() { diff --git a/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java b/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java index c8bf578..2c1a25f 100644 --- a/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java +++ b/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java @@ -7,7 +7,7 @@ import io.wdd.func.oss.service.OSSCoreService; import io.wdd.func.oss.service.OssBackendSelect; import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.XrayConfigInfo; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -112,7 +112,7 @@ public class XrayConfigDistribute { OSSCoreService ossCoreService; @Resource - CoreExecutionService executionService; + AsyncExecutionService executionService; public void uploadXrayConfigToOSS(ArrayList networkPathList) { diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 6de5c49..408d47a 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -19,7 +19,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP; import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; @@ -67,7 +65,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); - // 发送OctopusMessage-Agent buildOMessageAndSendToAllHealthyAgent( AgentOperationType.VERSION, @@ -75,21 +72,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); - ArrayList replayOMList = new ArrayList<>(); - OMReplayContend omReplayContend = OMReplayContend - .builder() - .initTime(currentTime) - .countDownLatch(countDownLatch) - .replayOMList(replayOMList) - .replayMatchKey( - OMReplayContend.generateMatchKey( - CurrentAppOctopusMessageType, - currentTime - ) - ) - .type(CurrentAppOctopusMessageType) - .build(); + OMReplayContend omReplayContend = OMReplayContend.build( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), + CurrentAppOctopusMessageType, + currentTime + ); + + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + // 调用后台接收处理所有的Replay信息 asyncWaitOMResult.waitFor(omReplayContend); @@ -101,21 +91,24 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { countDownLatch );*/ + + boolean isAllHealthyAgentReport = false; try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 - countDownLatch.await( + isAllHealthyAgentReport = countDownLatch.await( 5, TimeUnit.SECONDS ); } catch (InterruptedException e) { - log.warn("存在部分Agent没有上报 版本信息!"); + } finally { // 超时,或者 全部信息已经收集 + if (!isAllHealthyAgentReport) { + log.warn("存在部分Agent没有上报 版本信息!"); + } // 此处调用,即可中断 异步任务的收集工作 - REPLAY_CACHE_MAP.remove( - omReplayContend.getReplayMatchKey() - ); + asyncWaitOMResult.stopWaiting(omReplayContend); // 处理结果 omReplayContend @@ -132,7 +125,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // help gc omReplayContend = null; - } return result; @@ -165,21 +157,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { currentTime ); - CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); - ArrayList replayOMList = new ArrayList<>(); - OMReplayContend omReplayContend = OMReplayContend - .builder() - .initTime(currentTime) - .countDownLatch(countDownLatch) - .replayOMList(replayOMList) - .replayMatchKey( - OMReplayContend.generateMatchKey( - CurrentAppOctopusMessageType, - currentTime - ) - ) - .type(CurrentAppOctopusMessageType) - .build(); + // 构造结果 + OMReplayContend omReplayContend = OMReplayContend.build( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), + CurrentAppOctopusMessageType, + currentTime + ); + + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 asyncWaitOMResult.waitFor(omReplayContend); @@ -202,9 +187,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - REPLAY_CACHE_MAP.remove( - omReplayContend.getReplayMatchKey() - ); + asyncWaitOMResult.stopWaiting(omReplayContend); // 处理结果 omReplayContend diff --git a/server/src/main/java/io/wdd/rpc/controller/AgentController.java b/server/src/main/java/io/wdd/rpc/controller/AgentController.java index 4e946b4..8e5f110 100644 --- a/server/src/main/java/io/wdd/rpc/controller/AgentController.java +++ b/server/src/main/java/io/wdd/rpc/controller/AgentController.java @@ -21,14 +21,14 @@ public class AgentController { OctopusAgentService octopusAgentService; @GetMapping("/version") - @ApiOperation("[版本]-所有OctopusAgent") + @ApiOperation("[版本] - 所有OctopusAgent") public R> getAllAgentVersion(){ return R.ok(octopusAgentService.getAllAgentVersion()); } @GetMapping("/coreInfo") - @ApiOperation("[核心信息]-所有OctopusAgent") + @ApiOperation("[核心信息] - 所有OctopusAgent") public R> getAllAgentCoreInfo(){ return R.ok(octopusAgentService.getAllAgentCoreInfo()); diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index dad3719..d907e5c 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -5,7 +5,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.wdd.common.beans.response.R; import io.wdd.rpc.execute.result.BuildStreamReader; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -17,26 +17,27 @@ import java.util.List; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @RestController @RequestMapping("/octopus/server/executor") -@Api("Agent执行命令的Controller") +@Api(value = "Agent执行命令的Controller", tags = "Execution") public class ExecutionController { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Resource BuildStreamReader buildStreamReader; @PostMapping("/command/one") - @ApiOperation("[命令]-手动发送命令") + @ApiOperation("[命令] - 手动发送命令") public R patchCommandToAgent( - @RequestParam(value = "topicName") String topicName, + @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, @RequestParam(value = "commandList", required = false) @Nullable List commandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - String streamKey = coreExecutionService + String streamKey = asyncExecutionService .SendCommandToAgent( topicName, type, @@ -47,7 +48,7 @@ public class ExecutionController { } @PostMapping("/command/batch") - @ApiOperation("[命令]- 批量发送命令") + @ApiOperation("[命令] - 批量发送命令") public R> patchCommandToAgentList( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @@ -56,7 +57,7 @@ public class ExecutionController { @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(coreExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgent( topicNameList, type, commandList @@ -65,20 +66,51 @@ public class ExecutionController { @PostMapping("/command/all") - @ApiOperation("[命令]- 发送命令至所有的主机") - public R> patchCommandToAgentAll( + @ApiOperation("[命令] - 发送命令至所有的主机") + public R> patchCommandToAllAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(coreExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgent( ALL_AGENT_TOPIC_NAME_LIST, type, commandList )); } + @PostMapping("/command/healthy") + @ApiOperation("[命令] - 发送命令至健康的主机") + public R> patchCommandToHealthyAgent( + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "type", required = false) @Nullable String type + ) { + + return R.ok(asyncExecutionService.SendCommandToAgent( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, + type, + commandList + )); + } + + @PostMapping("/command/sync/one") + @ApiOperation("[命令] [同步] - 同步等待命令结果") + public R> SyncPatchCommandToAgent( + @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type + ) { + + return R.ok(asyncExecutionService.SendCommandToAgent( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, + type, + commandList + )); + } + @PostMapping("/agentStatusStream") @ApiOperation("切换Console查看Agent状态日志") @@ -105,7 +137,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentUpdate", @@ -121,7 +153,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentReboot", @@ -137,7 +169,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentShutdown", @@ -145,5 +177,21 @@ public class ExecutionController { )); } + @PostMapping("/function/bootUp") + @ApiOperation("重新部署") + public R> AgentBootUp( + @RequestParam(value = "topicNameList") + @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList + ) { + + return R.ok( + asyncExecutionService + .SendCommandToAgent( + topicNameList, + "AgentBootUp", + null + )); + } + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java similarity index 79% rename from server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java rename to server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index 100e84b..f068d07 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -1,19 +1,19 @@ package io.wdd.rpc.execute.service; +import io.wdd.common.beans.rabbitmq.OctopusMessage; + import java.util.HashMap; import java.util.List; -public interface CoreExecutionService { +public interface AsyncExecutionService { String SendCommandToAgent(String agentTopicName, String command); String SendCommandToAgent(String agentTopicName, List commandList); - String SendCommandToAgent(String agentTopicName, String type, List commandList); - List SendCommandToAgent(List agentTopicNameList, String type, List command); /** @@ -37,8 +37,9 @@ public interface CoreExecutionService { ); - /** ------------------------------------------------- */ - + /** + * ------------------------------------------------- + */ String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); @@ -89,4 +90,26 @@ public interface CoreExecutionService { ); + /** + * 同步命令调用的方法 + * + * @param agentTopicName + * @param type + * @param commandList + * @param commandListComplete + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + OctopusMessage SyncCallSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java similarity index 91% rename from server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java rename to server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 8e4b928..fb70cf3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.message.sender.OMessageToAgentSender; import lombok.extern.slf4j.Slf4j; @@ -22,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; @Service @Slf4j -public class CoreExecutionServiceImpl implements CoreExecutionService { +public class AsyncExecutionServiceImpl implements AsyncExecutionService { private static final String MANUAL_COMMAND_TYPE = "manual-command"; @Resource @@ -52,11 +53,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList) { - // 归一化type - if (StringUtils.isEmpty(type)) { - type = MANUAL_COMMAND_TYPE; - } - return SendCommandToAgent( agentTopicName, type, @@ -112,6 +108,29 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + String resultKey = futureKey; + // 判定是否是 FutureKey + if (null == futureKey) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } + + // 调用最底层的方法 + this.SyncCallSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + ); + + return resultKey; + } + + @Override + public OctopusMessage SyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error( @@ -122,7 +141,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); } - // 归一化type类型 不行 + // 归一化type + if (StringUtils.isEmpty(type)) { + type = MANUAL_COMMAND_TYPE; + } + String resultKey = futureKey; // 判定是否是 FutureKey if (null == futureKey) { @@ -178,10 +201,8 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // help gc executionMessage = null; - octopusMessage = null; - - return resultKey; + return octopusMessage; } private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { @@ -191,7 +212,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { return OctopusMessage .builder() .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) + .init_time(TimeUtils.currentFormatTime()) .uuid(agentTopicName) .content( objectMapper.writeValueAsString(executionMessage) diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java new file mode 100644 index 0000000..5bd7e39 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java @@ -0,0 +1,95 @@ +package io.wdd.rpc.execute.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * 同步命令执行的核心类 + * 需要等待命令执行完毕,完后返回相应的结果 + */ +public interface SyncExecutionService { + + /** + * ------------------------ Sync Command Executor ------------------------------ + */ + ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList); + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList); + + List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList); + + /** + * 调用 单行命令脚本的 最底层函数 + * + * @param agentTopicName + * @param type + * @param commandList + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + ArrayList SyncSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + + /** + * ------------------------------------------------- + */ + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + + + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + + /** + * 调用 完整脚本的 最底层函数 + * + * @param agentTopicName + * @param type + * @param commandList + * @param commandListComplete + * @param futureKey + * @param durationTask + * @return resultKey 本次操作在Redis中记录的结果Key + */ + ArrayList SyncSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java new file mode 100644 index 0000000..cfddef8 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -0,0 +1,236 @@ +package io.wdd.rpc.execute.service; + +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.rpc.message.handler.AsyncWaitOMResult; +import io.wdd.rpc.message.handler.OMReplayContend; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class SyncExecutionServiceImpl implements SyncExecutionService { + + private static final boolean COMMAND_EXEC_NEED_REPLAY = true; + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; + @Resource + AsyncWaitOMResult asyncWaitOMResult; + @Resource + AsyncExecutionService asyncExecutionService; + + /** + * 一个命令执行的最长等待时间 + */ + @Value("${octopus.agent.executor.processMaxTimeOut}") + Integer processMaxWaitSeconds; + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList) { + + return this.SyncSendCommandToAgent( + agentTopicName, + null, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList) { + + + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + + } + + @Override + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + atnFutureKey.get(agentTopicName), + false + ) + ) + .collect(Collectors.toList()); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + + OctopusMessage octopusMessage = asyncExecutionService.SyncCallSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + ); + + LocalDateTime initTime = octopusMessage.getInit_time(); + + ArrayList result = new ArrayList<>(); + + // 构造消息等待对象 + int commandCount = Math.max( + commandListComplete.size(), + 1 + ); + OMReplayContend omReplayContend = OMReplayContend.build( + commandCount, + CurrentAppOctopusMessageType, + initTime + ); + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + + // 开始等待结果 + asyncWaitOMResult.waitFor(omReplayContend); + + // 监听结果 + try { + boolean await = countDownLatch.await( + processMaxWaitSeconds, + TimeUnit.SECONDS + ); + + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + // 等待所有的结果返回 + // 停止等待结果 + asyncWaitOMResult.stopWaiting(omReplayContend); + + + // 解析结果 + omReplayContend + .getReplayOMList() + .stream() + .map( + om -> { + log.debug( + "replay message is => {}", + om + ); + + return (ArrayList) om.getResult(); + } + ) + .forEachOrdered( + singleResult -> result.addAll(singleResult) + ); + + } + + // 返回 + return result; + } +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java index de41ec3..0f29808 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java @@ -12,6 +12,12 @@ import java.util.concurrent.TimeUnit; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +/** + * 从Agent收集返回信息的统一处理地点 + * 使用方法: 业务类构造 OMReplayContend对象,调用AsyncWaitOMResult.waitFor()方法 + *

+ * 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容 + */ @Service @Slf4j public class AsyncWaitOMResult { @@ -21,18 +27,27 @@ public class AsyncWaitOMResult { * KEY -> replayMatchKey * VALUE -> OMReplayContend - 包含countDownLatch 和 result */ - public static final HashMap REPLAY_CACHE_MAP = new HashMap<>(); + private static final HashMap REPLAY_WAITING_TARGET = new HashMap<>(); public void waitFor(OMReplayContend omReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key - REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(), - omReplayContend); + REPLAY_WAITING_TARGET.put( + omReplayContend.getReplayMatchKey(), + omReplayContend + ); // 在调用线程的countDownLunch结束之后,关闭 // 清除 REPLAY_CACHE_MAP 中的队列 } + public void stopWaiting(OMReplayContend omReplayContend) { + + // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 + REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey()); + + } + @PostConstruct public void daemonHandleReplayOMFromAgent() { @@ -71,7 +86,7 @@ public class AsyncWaitOMResult { replayOMessage.getType(), replayOMessage.getInit_time() ); - if (!REPLAY_CACHE_MAP.containsKey(matchKey)) { + if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) { // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // todo 错误的数据需要放置于某处 @@ -80,7 +95,7 @@ public class AsyncWaitOMResult { } // Map中包含有Key,那么放置进去 - OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey); + OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java index 2f024e3..0abd5dc 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java @@ -11,7 +11,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import java.time.LocalDateTime; -import java.util.List; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; @Data @@ -35,8 +35,7 @@ public class OMReplayContend { CountDownLatch countDownLatch; @ApiModelProperty("回复的结果列表, 临时保存") - List replayOMList; - + ArrayList replayOMList; protected static String generateMatchKey(OMReplayContend replayIdentifier) { @@ -49,6 +48,11 @@ public class OMReplayContend { return relayMatchKey; } + /** + * @param messageType + * @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); + * @return + */ public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { String relayMatchKey = messageType.toString() + messageInitTime.toString(); @@ -56,4 +60,23 @@ public class OMReplayContend { return relayMatchKey; } + /** + * 方便使用的一个构造方法 + * + * @return + */ + public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + + return new OMReplayContend( + currentOMType, + currentTime, + generateMatchKey( + currentOMType, + currentTime + ), + new CountDownLatch(waitForReplayNum), + new ArrayList<>() + ); + } + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 8ed6c39..3329193 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -1,7 +1,7 @@ package io.wdd.rpc.scheduler.service.script; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import lombok.extern.slf4j.Slf4j; @@ -20,7 +20,7 @@ import java.util.List; public class AgentApplyScheduledScript { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Resource QuartzSchedulerUtils quartzSchedulerUtils; @@ -45,7 +45,7 @@ public class AgentApplyScheduledScript { } // 发送命令到Agent中 - List resultKeyList = coreExecutionService + List resultKeyList = asyncExecutionService .SendCommandToAgentComplete( targetMachineList, scriptType, diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index e78fc65..38cc180 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -1,6 +1,6 @@ package io.wdd.server; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,7 +13,7 @@ class ServerApplicationTests { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Test void testCoreExecutionCompleteScript() { @@ -61,7 +61,7 @@ class ServerApplicationTests { ) ); - List resultList = coreExecutionService.SendCommandToAgentComplete( + List resultList = asyncExecutionService.SendCommandToAgentComplete( targetMachineList, "Scheduled Script", completeScript