From fbd8348e15f33988132bb7db28c84dc329f640fd Mon Sep 17 00:00:00 2001 From: zeaslity Date: Thu, 10 Aug 2023 17:09:51 +0800 Subject: [PATCH] =?UTF-8?q?[=20Service=20]=20[=20Executor=20]=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E9=87=8D=E6=9E=84Executor=E9=83=A8=E5=88=86=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/executor/AppFunction.go | 8 +- agent-go/executor/CommandExecutor.go | 13 +- .../wdd/func/script/service/FuncService.java | 15 + .../func/script/service/FuncServiceImpl.java | 67 +++ .../wdd/func/xray/service/XrayCallAgent.java | 9 +- .../io/wdd/rpc/agent/OctopusAgentService.java | 13 +- .../rpc/agent/OctopusAgentServiceImpl.java | 29 +- .../config/RedisConfiguration.java | 2 +- .../wdd/rpc/controller/AgentController.java | 8 +- .../rpc/controller/ExecutionController.java | 251 +++++------ .../wdd/rpc/controller/StatusController.java | 8 +- .../io/wdd/rpc/execute/ExecutionMessage.java | 7 +- .../wdd/rpc/execute/ExecutionMessageType.java | 13 + .../io/wdd/rpc/execute/ExecutionService.java | 21 + .../wdd/rpc/execute/ExecutionServiceImpl.java | 125 ++++++ .../execute/config/CommandReaderConfig.java | 38 -- .../config/CommandReaderConfigBean.java | 26 -- .../wdd/rpc/execute/config/ExecutionLog.java | 16 - .../ExecutionResultStringDeserializer.java | 33 -- .../rpc/execute/result/BuildStreamReader.java | 189 -------- .../execute/result/CommandResultReader.java | 87 ---- .../result/RedisStreamReaderConfig.java | 121 ----- .../service/AsyncExecutionService.java | 114 ++--- .../service/AsyncExecutionServiceImpl.java | 326 +++++--------- .../execute/service/SyncExecutionService.java | 89 +--- .../service/SyncExecutionServiceImpl.java | 421 ++++-------------- .../io/wdd/rpc/init/AcceptAgentInitInfo.java | 2 +- .../io/wdd/rpc/message/OctopusMessage.java | 24 +- .../AsyncWaitOctopusMessageResultService.java | 78 ++-- ...a => OctopusMessageSyncReplayContend.java} | 21 +- .../message/handler/sync/OMessageHandler.java | 124 ++++++ ...ver.java => OMessageToServerListener.java} | 32 +- .../message/sender/OMessageToAgentSender.java | 41 +- .../job/AgentAliveStatusMonitorJob.java | 64 +-- .../scheduler/job/AgentMetricStatusJob.java | 46 +- .../service/BuildStatusScheduleTask.java | 85 ++-- .../script/AgentApplyScheduledScript.java | 46 +- .../AgentMetricStatusCollectService.java | 3 +- .../wdd/rpc/status/OctopusStatusMessage.java | 2 +- .../status/service/SyncStatusServiceImpl.java | 9 +- server/src/main/resources/application.yml | 8 +- .../io/wdd/server/ServerApplicationTests.java | 4 +- .../test/java/io/wdd/server/SimpleTest.java | 32 -- 43 files changed, 1020 insertions(+), 1650 deletions(-) create mode 100644 server/src/main/java/io/wdd/func/script/service/FuncService.java create mode 100644 server/src/main/java/io/wdd/func/script/service/FuncServiceImpl.java rename server/src/main/java/io/wdd/rpc/{execute => }/config/RedisConfiguration.java (96%) create mode 100644 server/src/main/java/io/wdd/rpc/execute/ExecutionMessageType.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/ExecutionService.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java rename server/src/main/java/io/wdd/rpc/message/handler/async/{OctopusMessageSynScReplayContend.java => OctopusMessageSyncReplayContend.java} (72%) create mode 100644 server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java rename server/src/main/java/io/wdd/rpc/message/handler/sync/{OMessageHandlerServer.java => OMessageToServerListener.java} (77%) diff --git a/agent-go/executor/AppFunction.go b/agent-go/executor/AppFunction.go index 03c1482..12cb157 100644 --- a/agent-go/executor/AppFunction.go +++ b/agent-go/executor/AppFunction.go @@ -5,8 +5,14 @@ import ( "time" ) +type OctopusFunc interface { + Exec(baseFuncName string, funcArgs ...string) []string + + Deploy(appFuncName string, funcArgs ...string) (bool, []string) +} + type AppFunc interface { - Deploy(appFuncName string, funcArgs ...string) []string + Deploy(appFuncName string, funcArgs ...string) (bool, []string) } var AppExecuteErrorLogPrefix = []string{"App指令执行错误! => "} diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/executor/CommandExecutor.go index 9de4b34..42426bd 100644 --- a/agent-go/executor/CommandExecutor.go +++ b/agent-go/executor/CommandExecutor.go @@ -13,7 +13,7 @@ type ExecutionMessage struct { NeedResultReplay bool `json:"needResultReplay"` DurationTask bool `json:"durationTask,default:false"` Type string `json:"type"` - BaseFuncContent []string `json:"baseFuncContent"` + FuncContent []string `json:"funcContent"` SingleLineCommand []string `json:"singleLineCommand"` MultiLineCommand [][]string `json:"multiLineCommand"` PipeLineCommand [][]string `json:"pipeLineCommand"` @@ -32,9 +32,18 @@ func Execute(em *ExecutionMessage) ([]string, error) { if strings.HasPrefix(em.Type, "BASE") { // base function - resultLog = AgentOsOperatorCache.Exec(em.BaseFuncContent[0], em.BaseFuncContent[1:]...) + resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...) err = nil + } else if strings.HasPrefix(em.Type, "APP") { + // app function + ok, resultLog := AgentOsOperatorCache.Deploy(em.FuncContent[0], em.FuncContent[1:]...) + if ok { + return resultLog, nil + } else { + return resultLog, nil + } + } else { // shell command diff --git a/server/src/main/java/io/wdd/func/script/service/FuncService.java b/server/src/main/java/io/wdd/func/script/service/FuncService.java new file mode 100644 index 0000000..38cdf50 --- /dev/null +++ b/server/src/main/java/io/wdd/func/script/service/FuncService.java @@ -0,0 +1,15 @@ +package io.wdd.func.script.service; + + +import java.util.List; + + +public interface FuncService { + + List callBaseFuncService(String agentTopicName, String baseFunctionName, List funcArgs); + + + List callAppFuncService(String agentTopicName, String appFunctionName, List funcArgs); + + +} diff --git a/server/src/main/java/io/wdd/func/script/service/FuncServiceImpl.java b/server/src/main/java/io/wdd/func/script/service/FuncServiceImpl.java new file mode 100644 index 0000000..2207f28 --- /dev/null +++ b/server/src/main/java/io/wdd/func/script/service/FuncServiceImpl.java @@ -0,0 +1,67 @@ +package io.wdd.func.script.service; + +import io.wdd.rpc.execute.ExecutionMessageType; +import io.wdd.rpc.execute.ExecutionService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +@Service +@Slf4j +public class FuncServiceImpl implements FuncService { + + @Resource + ExecutionService executionService; + + + @Override + public List callBaseFuncService(String agentTopicName, String baseFunctionName, List funcArgs) { + + return this.syncCallFunction( + agentTopicName, + ExecutionMessageType.BASE, + baseFunctionName, + funcArgs + ); + } + + @Override + public List callAppFuncService(String agentTopicName, String appFunctionName, List funcArgs) { + + return this.syncCallFunction( + agentTopicName, + ExecutionMessageType.APP, + appFunctionName, + funcArgs + ); + } + + + private List syncCallFunction(String agentTopicName, ExecutionMessageType emType, String funcName, List funcArgs) { + + // 重新构造内容 + funcArgs.add( + 0, + funcName + ); + + // 调用 + ArrayList resultLog = executionService.SendCommandToAgent( + agentTopicName, + emType.toString(), + funcArgs, + null, + null, + true, + "", + false + ); + + + return resultLog; + } + +} diff --git a/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java b/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java index 90fece5..0d4a80c 100644 --- a/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java +++ b/server/src/main/java/io/wdd/func/xray/service/XrayCallAgent.java @@ -3,7 +3,7 @@ package io.wdd.func.xray.service; import io.wdd.common.utils.TimeUtils; import io.wdd.func.oss.config.OctopusObjectSummary; import io.wdd.func.xray.beans.node.ProxyNode; -import io.wdd.rpc.execute.service.SyncExecutionService; +import io.wdd.rpc.execute.ExecutionService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -83,7 +83,7 @@ public class XrayCallAgent { } @Resource - SyncExecutionService executionService; + ExecutionService executionService; /** * 为代理链的每一个节点 构建Xray配置更新命令,然后发送至对应的Agent中 @@ -131,17 +131,18 @@ public class XrayCallAgent { ); // 向Agent发送命令,执行更新操作! - String resultKey = executionService.SyncSendCommandToAgent( + executionService.SendCommandToAgent( proxyNode.getAgentTopicName(), updateCommandType, null, + null, updateXrayCommandList, false, null, false ); - return resultKey; + return ""; } ) .collect(Collectors.toList()); diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentService.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentService.java index c0c31d0..368a299 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentService.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentService.java @@ -8,15 +8,16 @@ public interface OctopusAgentService { /** - * 获取所有Agent的版本信息,附带最新的版本信息 - * 超时时间为 5s + * 获取所有Agent的版本信息,附带最新的版本信息 + * 超时时间为 5s + * * @return key - AgentTopicName value - version Info */ Map getAllAgentVersion(); /** - * 获取所有Agent的核心信息,方便更新系统信息 + * 获取所有Agent的核心信息,方便更新系统信息 * * @return agent-topic-name value -agentServerInfo */ @@ -24,9 +25,9 @@ public interface OctopusAgentService { /** - * 执行 Agent 关键操作的接口 - * 关机 - * */ + * 执行 Agent 关键操作的接口 + * 关机 + */ String shutdownAgentDanger(String agentTopicName); } diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index c065b35..de2de01 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -7,13 +7,12 @@ import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; +import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; @@ -26,12 +25,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.LATEST_VERSION; -import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.LATEST_VERSION; +import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -@Service +//@Service @Slf4j public class OctopusAgentServiceImpl implements OctopusAgentService { @@ -70,7 +69,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - OctopusMessageSynScReplayContend agentReplayContend = OctopusMessageSynScReplayContend.build( + OctopusMessageSyncReplayContend agentReplayContend = OctopusMessageSyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime @@ -147,16 +146,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造结果 - OctopusMessageSynScReplayContend octopusMessageSynScReplayContend = OctopusMessageSynScReplayContend.build( + OctopusMessageSyncReplayContend octopusMessageSyncReplayContend = OctopusMessageSyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = octopusMessageSynScReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = octopusMessageSyncReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOctopusMessageResultService.waitFor(octopusMessageSynScReplayContend); + asyncWaitOctopusMessageResultService.waitFor(octopusMessageSyncReplayContend); /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, @@ -176,10 +175,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSynScReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSyncReplayContend); // 处理结果 - octopusMessageSynScReplayContend + octopusMessageSyncReplayContend .getReplayOMList() .stream() .forEach( @@ -207,7 +206,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - octopusMessageSynScReplayContend = null; + octopusMessageSyncReplayContend = null; } return result; @@ -265,7 +264,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 - OCTOPUS_MESSAGE_FROM_AGENT.offer(message); + OCTOPUS_MESSAGE_FROM_AGENT.add(message); // 返回,继续死循环 continue; @@ -362,7 +361,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // OctopusMessageType判断 boolean OMTypeEqual = message - .getType() + .getOctopusMessageType() .equals(CurrentAppOctopusMessageType); return startTimeEqual && OMTypeEqual; @@ -411,7 +410,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { return OctopusMessage .builder() - .type(CurrentAppOctopusMessageType) + .octopusMessageType(CurrentAppOctopusMessageType) .uuid(agentTopicName) .init_time(currentTime) .content(ops) diff --git a/server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java b/server/src/main/java/io/wdd/rpc/config/RedisConfiguration.java similarity index 96% rename from server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java rename to server/src/main/java/io/wdd/rpc/config/RedisConfiguration.java index 985b36a..9fb5417 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/RedisConfiguration.java +++ b/server/src/main/java/io/wdd/rpc/config/RedisConfiguration.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.execute.config; +package io.wdd.rpc.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/server/src/main/java/io/wdd/rpc/controller/AgentController.java b/server/src/main/java/io/wdd/rpc/controller/AgentController.java index ddcea7d..a14d4a2 100644 --- a/server/src/main/java/io/wdd/rpc/controller/AgentController.java +++ b/server/src/main/java/io/wdd/rpc/controller/AgentController.java @@ -6,14 +6,12 @@ import io.wdd.common.response.R; import io.wdd.rpc.agent.OctopusAgentService; import io.wdd.server.beans.vo.ServerInfoVO; import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Map; -@RestController -@RequestMapping("/octopus/server/agent") +//@RestController +//@RequestMapping("/octopus/server/agent") @Api(value = "处理Agent核心内容的Controller", tags = "Agent") public class AgentController { @@ -22,7 +20,7 @@ public class AgentController { @GetMapping("/version") @ApiOperation("[版本] - 所有OctopusAgent") - public R> getAllAgentVersion(){ + public R> getAllAgentVersion() { return R.ok(octopusAgentService.getAllAgentVersion()); } diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 622050a..3d20036 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -4,8 +4,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.wdd.common.response.R; -import io.wdd.rpc.execute.service.AsyncExecutionService; -import io.wdd.rpc.execute.service.SyncExecutionService; +import io.wdd.rpc.execute.ExecutionService; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -14,25 +13,19 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Nullable; import javax.annotation.Resource; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; - @RestController @RequestMapping("/octopus/server/executor") @Api(value = "Agent执行命令的Controller", tags = "Execution") public class ExecutionController { @Resource - SyncExecutionService syncExecutionService; - @Resource - AsyncExecutionService asyncExecutionService; + ExecutionService executionService; @PostMapping("/command/one") @ApiOperation("[命令] [异步]- 单台主机") - public R patchCommandToAgent( + public R> patchCommandToAgent( @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, @RequestParam(value = "commandList", required = false) @Nullable List commandList, @RequestParam(value = "completeCommandList", required = false) @@ -41,19 +34,7 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - ArrayList streamKeyList = asyncExecutionService - .AsyncSendCommandToAgentComplete( - topicName, - type, - commandList, - completeCommandList, - false, - null, - isDurationTask - ); - - - return R.ok(streamKeyList.toString()); + return R.ok(null); } @PostMapping("/command/batch") @@ -68,15 +49,7 @@ public class ExecutionController { @RequestParam(value = "type", required = false) @Nullable String type, @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - - List> arrayListList = asyncExecutionService.AsyncSendCommandToAgentComplete( - topicNameList, - type, - commandList, - completeCommandList, - isDurationTask - ); - return R.ok(arrayListList); + return R.ok(null); } @@ -91,13 +64,7 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete( - ALL_AGENT_TOPIC_NAME_LIST, - type, - commandList, - completeCommandList, - isDurationTask - )); + return R.ok(null); } @PostMapping("/command/healthy") @@ -111,15 +78,26 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete( - ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, - type, - commandList, - completeCommandList, - isDurationTask - )); +// List> pathResult = syncExecutionService +// .SyncSendCommandToAgentComplete( +// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, +// type, +// null, +// commandList, +// completeCommandList, +// false, +// null, +// isDurationTask +// ); +// +// +// return R.ok(pathResult); + return R.ok(null); + + } + @PostMapping("/command/sync/one") @ApiOperation("[命令] [同步] - 单机-等待命令结果") public R> SyncPatchCommandToAgent( @@ -131,19 +109,25 @@ public class ExecutionController { @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type ) { - return R.ok( - Collections.singletonList(syncExecutionService.SyncSendCommandToAgentComplete( + ArrayList resultLog = executionService + .SendCommandToAgent( topicName, type, + null, commandList, - completeCommandList - )) - ); + completeCommandList, + false, + null, + false + ); + + return R.ok(resultLog); + } @PostMapping("/command/sync/batch") @ApiOperation("[命令] [同步] - 批量-等待命令结果") - public R> SyncPatchCommandToAgentBatch( + public R>> SyncPatchCommandToAgentBatch( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @RequestParam(value = "commandList", required = false) @@ -154,20 +138,28 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok( - syncExecutionService.SyncSendCommandToAgentComplete( - topicNameList, - type, - commandList, - completeCommandList, - isDurationTask - ) - ); +// List> pathResult = syncExecutionService +// .SyncSendCommandToAgentComplete( +// topicNameList, +// type, +// null, +// commandList, +// completeCommandList, +// false, +// null, +// isDurationTask +// ); +// +// +// return R.ok(pathResult); + return R.ok(null); + + } @PostMapping("/command/sync/all") @ApiOperation("[命令] [同步] - 全部-同步等待命令结果") - public R> SyncPatchCommandToAgentAll( + public R>> SyncPatchCommandToAgentAll( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "completeCommandList", required = false) @@ -176,20 +168,59 @@ public class ExecutionController { @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask ) { - return R.ok( - syncExecutionService.SyncSendCommandToAgentComplete( - ALL_AGENT_TOPIC_NAME_LIST, - type, - commandList, - completeCommandList, - isDurationTask - ) - ); +// List> pathResult = syncExecutionService +// .SyncSendCommandToAgentComplete( +// ALL_AGENT_TOPIC_NAME_LIST, +// type, +// null, +// commandList, +// completeCommandList, +// false, +// null, +// isDurationTask +// ); +// +// +// return R.ok(pathResult); + return R.ok(null); + + + } + + @PostMapping("/command/sync/healthy") + @ApiOperation("[命令] [同步] - 健康的主机") + public R>> SyncPatchCommandToHealthyAgent( + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, + @RequestParam(value = "type", required = false) @Nullable String type, + @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask + ) { + +// List> pathResult = syncExecutionService +// .SyncSendCommandToAgentComplete( +// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, +// type, +// null, +// commandList, +// completeCommandList, +// false, +// null, +// isDurationTask +// ); +// +// +// return R.ok(pathResult); + return R.ok(null); + + } @PostMapping("/agentStatusStream") @ApiOperation("切换Console查看Agent状态日志") + @Deprecated public R getAgentStatusStrem( @RequestParam(value = "streamKey") @ApiParam(value = "status的Stream Key") String streamKey ) { @@ -199,82 +230,4 @@ public class ExecutionController { } - // auth required -// @PostMapping("/function/update") -// @ApiOperation("升级") -// public R> AgentUpdate( -// @RequestParam(value = "topicNameList") -// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList -// ) { -// -// return R.ok( -// syncExecutionService -// .SyncSendCommandToAgent( -// topicNameList, -// "AgentUpdate", -// null, -// false, -// null, -// true -// )); -// } -// -// @PostMapping("/function/reboot") -// @ApiOperation("重启") -// public R> AgentReboot( -// @RequestParam(value = "topicNameList") -// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList -// ) { -// -// return R.ok( -// asyncExecutionService -// .SyncSendCommandToAgent( -// topicNameList, -// "AgentReboot", -// null, -// false, -// null, -// true -// )); -// } -// -// @PostMapping("/function/shutdown") -// @ApiOperation("关闭") -// public R> AgentShutdown( -// @RequestParam(value = "topicNameList") -// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList -// ) { -// -// return R.ok( -// syncExecutionService -// .SyncSendCommandToAgent( -// topicNameList, -// "AgentShutdown", -// null, -// false, -// null, -// true -// )); -// } -// -// @PostMapping("/function/bootUp") -// @ApiOperation("重新部署") -// public R> AgentBootUp( -// @RequestParam(value = "topicNameList") -// @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList -// ) { -// -// return R.ok( -// asyncExecutionService -// .SyncSendCommandToAgent( -// topicNameList, -// "AgentBootUp", -// null, -// false, -// null, -// true -// )); -// } - - } diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index 208d7cf..6362e5b 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -8,7 +8,9 @@ import io.wdd.rpc.beans.request.MetricQueryEntity; import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; import io.wdd.rpc.status.beans.AgentStatus; import io.wdd.rpc.status.service.SyncStatusService; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import javax.annotation.Resource; import java.util.ArrayList; @@ -18,9 +20,9 @@ import java.util.Map; import static io.wdd.rpc.status.CommonAndStatusCache.*; -@RestController +//@RestController @Api(value = "Agent运行状态Controller", tags = "Status") -@RequestMapping("/octopus/server/status") +//@RequestMapping("/octopus/server/status") public class StatusController { @Resource diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionMessage.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionMessage.java index 7b0e3e4..adfb66f 100644 --- a/server/src/main/java/io/wdd/rpc/execute/ExecutionMessage.java +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionMessage.java @@ -33,10 +33,15 @@ public class ExecutionMessage { /** * 用于区分 ExecutionMessage的类型 - * 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot + * BASE APP */ private String type; + /** + * 执行功能脚本时需要的参数 + */ + private List funcContent; + /** * 只有一行的命令行 */ diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionMessageType.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionMessageType.java new file mode 100644 index 0000000..966166b --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionMessageType.java @@ -0,0 +1,13 @@ +package io.wdd.rpc.execute; + +public enum ExecutionMessageType { + + + // 基础类型,执行基础脚本类 + BASE, + + // 应用类,执行特定功能 + APP, + + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionService.java new file mode 100644 index 0000000..78af7da --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionService.java @@ -0,0 +1,21 @@ +package io.wdd.rpc.execute; + +import java.util.ArrayList; +import java.util.List; + +public interface ExecutionService { + + + ArrayList SendCommandToAgent( + String agentTopicName, + String type, + List funcContent, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String resultKey, + boolean durationTask + ); + + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java new file mode 100644 index 0000000..b80ba42 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java @@ -0,0 +1,125 @@ +package io.wdd.rpc.execute; + +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.OctopusMessageType; +import io.wdd.rpc.message.sender.OMessageToAgentSender; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; + +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; + +@Service +@Slf4j +public class ExecutionServiceImpl implements ExecutionService { + + private static final String MANUAL_COMMAND_TYPE = "manual-command"; + + @Resource + OMessageToAgentSender oMessageToAgentSender; + + @Override + public ArrayList SendCommandToAgent(String agentTopicName, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String resultKey, boolean durationTask) { + + ArrayList commandResultLog = null; + + // 归一化type + if (StringUtils.isEmpty(type)) { + type = MANUAL_COMMAND_TYPE; + } + + // 构造 Execution Command对应的消息体 + ExecutionMessage executionMessage = this + .generateExecutionMessage( + type, + commandList, + resultKey, + commandListComplete, + needResultReplay, + durationTask + ); + OctopusMessage octopusMessage = this.generateOctopusMessage( + agentTopicName, + executionMessage + ); + + // send the message + oMessageToAgentSender.send(octopusMessage); + + + // 需要返回结果 + if (!durationTask) { + + synchronized (octopusMessage) { + try { + octopusMessage.wait(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // 转换结果 + commandResultLog = (ArrayList) octopusMessage.getResult(); + + // debug + log.debug( + "执行命令 {} 的结果为 {} 内容为 {}", + executionMessage.getSingleLineCommand() == null ? executionMessage.getMultiLineCommand() : executionMessage.getSingleLineCommand(), + octopusMessage.getResultCode(), + octopusMessage.getResult() + ); + + } + + return commandResultLog; + } + + + private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { + + return OctopusMessage + .builder() + .octopusMessageType(OctopusMessageType.EXECUTOR) + .init_time(TimeUtils.currentFormatTime()) + .uuid(agentTopicName) + .content( + executionMessage + ) + .build(); + } + + private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { + + return ExecutionMessage + .builder() + .resultKey(resultKey) + .type(type) + .singleLineCommand(commandList) + .multiLineCommand(commandListComplete) + .needResultReplay(needResultReplay) + .durationTask(durationTask) + .build(); + } + + + private boolean validateCommandInfo(String agentTopicName, String type) { + + // 检查agentTopicName是否存在 + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { + log.error( + "agentTopicName异常! 输入为 => {}", + agentTopicName + ); + return false; + //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); + } + + return true; + } + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java deleted file mode 100644 index 28da813..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.wdd.rpc.execute.config; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; - -import java.util.ArrayList; - -@Data -@NoArgsConstructor -@AllArgsConstructor -@SuperBuilder(toBuilder = true) -public class CommandReaderConfig { - - /** - * 消费者类型:独立消费、消费组消费 - */ - private String consumerType; - /** - * 消费组 - */ - private String group; - /** - * 消费组中的某个消费者 - */ - private String consumerName; - - private String streamKey; - - private String recordId; - - /** - * 执行的结果对象,保存在此处 - */ - private ArrayList ExecutionResult; - -} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java deleted file mode 100644 index ac3ae3f..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java +++ /dev/null @@ -1,26 +0,0 @@ -//package io.wdd.rpc.execute.config; -// -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Configuration; -// -// -//@Configuration -//public class CommandReaderConfigBean { -// -// // todo must support for multi thread -// // its not thread safe now -// @Bean -// public CommandReaderConfig commandReaderConfig() { -// -// return CommandReaderConfig -// .builder() -// .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME) -// .streamKey("ccc") -// .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME) -// .group("ccc") -// .ExecutionResult(null) -// .build(); -// } -// -// -//} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java deleted file mode 100644 index ccbfad4..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.wdd.rpc.execute.config; - -import io.swagger.annotations.ApiModel; -import io.wdd.server.beans.po.ExecutionLogPO; -import lombok.AllArgsConstructor; -import lombok.Data; - -import java.time.LocalDateTime; - -@Data -@AllArgsConstructor -@ApiModel("Execution模快持久化Bean对象") -public class ExecutionLog extends ExecutionLogPO { - - -} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java deleted file mode 100644 index c47248e..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.wdd.rpc.execute.config; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.ArrayList; - -public class ExecutionResultStringDeserializer { - - public static ArrayList format(String executionResultString) { - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - true); - - try { - - String tmp = objectMapper.readValue(executionResultString, - new TypeReference() { - }); - - return objectMapper.readValue(tmp, - new TypeReference>() { - }); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } -} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java deleted file mode 100644 index c6168a7..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/result/BuildStreamReader.java +++ /dev/null @@ -1,189 +0,0 @@ -//package io.wdd.rpc.execute.result; -// -//import io.wdd.rpc.execute.config.CommandReaderConfig; -//import io.wdd.server.utils.SpringUtils; -//import lombok.SneakyThrows; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.data.redis.connection.stream.ReadOffset; -//import org.springframework.data.redis.connection.stream.StreamOffset; -//import org.springframework.data.redis.stream.StreamMessageListenerContainer; -//import org.springframework.stereotype.Component; -// -//import java.util.ArrayList; -//import java.util.HashMap; -//import java.util.concurrent.TimeUnit; -// -//import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER; -// -// -//@Component -//@Slf4j -//public class BuildStreamReader { -// -// private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); -// private RedisStreamReaderConfig redisStreamReaderConfig; -// -// private StreamMessageListenerContainer streamMessageListenerContainer; -// -// private CommandReaderConfig commandReaderConfig; -// -// public void buildStreamReader(CommandReaderConfig commandReaderConfig) { -// -// // prepare the environment -// prepareExecutionEnv(); -// -// -// // just modify the redis listener container and it's ok -// modifyExecutionStreamReader(commandReaderConfig); -// -// } -// -// @SneakyThrows -// private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) { -// -// // stop the old stream listener container -// if (this.streamMessageListenerContainer.isRunning()) { -// this.streamMessageListenerContainer.stop(); -// } -// -// // modify container -// this.streamMessageListenerContainer.receive( -// StreamOffset.create( -// commandReaderConfig.getStreamKey(), -// ReadOffset.lastConsumed()), -// -// new CommandResultReader( -// commandReaderConfig -// ) -// ); -// -// -// // very important -// TimeUnit.MILLISECONDS.sleep(500); -// this.streamMessageListenerContainer.start(); -// } -// -// private void prepareExecutionEnv() { -// -// getRedisStreamListenerContainer(); -// -// getRedisStreamReaderConfig(); -// -// } -// -// private void getRedisStreamReaderConfig() { -// -// this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig", -// CommandReaderConfig.class); -// } -// -// private void getRedisStreamListenerContainer() { -// -// this.streamMessageListenerContainer = SpringUtils.getBean( -// EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER, -// StreamMessageListenerContainer.class -// ); -// } -// -// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { -// registerStreamReader(redisStreamListenerContainerBeanName, -// streamKey, -// null); -// } -// -// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList ExecutionResult) { -// -// // prepare the environment -// prepareEnv(); -// -// // oldStreamKey equals streamKey don't need to do anything , just return -// if (redisStreamReaderConfig.getStreamKey() -// .equals(streamKey)) { -// log.debug("redis listener container not change !"); -// return; -// } -// -// // destroy the old REDIS_STREAM_LISTENER_CONTAINER -// destroyStreamReader(streamKey); -// -// // modify the configuration ==> streamKey -// modifyStreamReader(streamKey, -// ExecutionResult); -// -// // re-create the REDIS_STREAM_LISTENER_CONTAINER -// createStreamReader(redisStreamListenerContainerBeanName, -// streamKey); -// -// } -// -// private void prepareEnv() { -// -// getRedisStreamConfig(); -// -// } -// -// private void getRedisStreamConfig() { -// -// this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", -// RedisStreamReaderConfig.class); -// } -// -// -// private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { -// -// log.debug("start to create the redis stream listener container"); -// // create the lazy bean -// -// StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, -// StreamMessageListenerContainer.class); -// -// REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, -// streamMessageListenerContainer); -// -// // very important -// log.debug("start the listener container"); -// streamMessageListenerContainer.start(); -// -// -// } -// -// private void modifyStreamReader(String streamKey, ArrayList executionResult) { -// -// log.debug("start to modify the redis stream listener container stream key"); -// String oldStreamKey = redisStreamReaderConfig.getStreamKey(); -// -// log.debug("change stream key from [{}] to [{}]", -// oldStreamKey, -// streamKey); -// -// log.debug("start to set the Redis Stream Reader key"); -// redisStreamReaderConfig.setStreamKey(streamKey); -// -// log.debug("start to set the Redis Stream Execution Result Container"); -// redisStreamReaderConfig.setExecutionResult(executionResult); -// -// } -// -// -// private void destroyStreamReader(String streamKey) { -// -// String oldStreamKey = redisStreamReaderConfig.getStreamKey(); -// -// if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { -// -// StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); -// -// log.debug("destroyed old redis stream listener container is [ {} ]", -// streamMessageListenerContainer); -// -// -// // double destroy -// SpringUtils.destroyBean(streamMessageListenerContainer); -// streamMessageListenerContainer.stop(); -// // help gc -// streamMessageListenerContainer = null; -// } -// -// -// } -//} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java deleted file mode 100644 index a1ba83d..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ /dev/null @@ -1,87 +0,0 @@ -package io.wdd.rpc.execute.result; - -import io.wdd.rpc.execute.config.CommandReaderConfig; -import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.RecordId; -import org.springframework.data.redis.stream.StreamListener; - -import java.util.ArrayList; - -@Getter -@Setter -@Slf4j -public class CommandResultReader implements StreamListener> { - - // https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde - - //https://segmentfault.com/a/1190000040946712 - - //https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers - - - private CommandReaderConfig commandReaderConfig; - - public CommandResultReader(String consumerType, String group, String consumerName) { - new CommandResultReader(consumerType, - group, - consumerName, - null); - } - - public CommandResultReader(String consumerType, String group, String consumerName, ArrayList executionResult) { - this.commandReaderConfig = CommandReaderConfig - .builder() - .consumerName(consumerName) - .group(group) - .consumerType(consumerType) - .ExecutionResult(executionResult) - .build(); - - } - - - public CommandResultReader(CommandReaderConfig commandReaderConfig) { - - this.commandReaderConfig = commandReaderConfig; - - } - - @Override - public void onMessage(MapRecord message) { - - String streamKey = message.getStream(); - RecordId messageId = message.getId(); - String key = (String) message.getValue() - .keySet() - .toArray()[0]; - String value = message.getValue() - .get(key); - - - ArrayList executionResultFormat = ExecutionResultStringDeserializer.format(value); - - // 赋值给外部的结果,是的执行的结果可以被拿到 - this.commandReaderConfig.setExecutionResult(executionResultFormat); - this.commandReaderConfig.setRecordId(String.valueOf(messageId)); - - log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]", - streamKey, - executionResultFormat.get(1), - key, - messageId); - // print to console - executionResultFormat - .stream() - .forEach( - System.out::println - ); - - - } - - -} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java deleted file mode 100644 index 8d51e9c..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java +++ /dev/null @@ -1,121 +0,0 @@ -//package io.wdd.rpc.execute.result; -// -// -//import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader; -//import lombok.Getter; -//import lombok.Setter; -//import lombok.extern.slf4j.Slf4j; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Configuration; -//import org.springframework.context.annotation.Lazy; -//import org.springframework.context.annotation.Scope; -//import org.springframework.data.redis.connection.RedisConnectionFactory; -//import org.springframework.data.redis.connection.stream.MapRecord; -//import org.springframework.data.redis.connection.stream.ReadOffset; -//import org.springframework.data.redis.connection.stream.StreamOffset; -//import org.springframework.data.redis.stream.StreamMessageListenerContainer; -// -//import javax.annotation.Resource; -//import java.time.Duration; -//import java.util.ArrayList; -// -//@Configuration -//@Slf4j -//@Getter -//@Setter -//public class RedisStreamReaderConfig { -// -// @Resource -// private RedisConnectionFactory redisConnectionFactory; -// -// public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer"; -// -// public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer"; -// -// public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer"; -// -// public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer"; -// -// /** -// * used in old model -// */ -// private String streamKey = "cccc"; -// -// /** -// * no use -// */ -// private ArrayList executionResult = null; -// -// -// @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER) -// @Lazy -// public StreamMessageListenerContainer> executionResultRedisStreamListenerContainer(){ -// -// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions -// .builder() -// .pollTimeout(Duration.ofSeconds(2)) -// .build(); -// -// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); -// -// return listenerContainer; -// } -// -// -// @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) -// @Scope("prototype") -// @Lazy -// public StreamMessageListenerContainer> commandResultRedisStreamListenerContainer(){ -// -// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions -// .builder() -// .pollTimeout(Duration.ofSeconds(2)) -// .build(); -// -// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); -// -// // todo 此部分可以被移出到另外的位置,会更加方便,就不需要对此Bean进行创建和销毁了 -// listenerContainer.receive( -// -// StreamOffset.create(streamKey, ReadOffset.lastConsumed()), -// -// new CommandResultReader( -// REDIS_STREAM_LISTENER_CONSUMER_NAME, -// streamKey, -// REDIS_STREAM_LISTENER_CONSUMER_NAME, -// executionResult -// ) -// -// ); -// -// return listenerContainer; -// } -// -// @Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER) -// @Scope("prototype") -// @Lazy -// public StreamMessageListenerContainer> agentStatusRedisStreamListenerContainer(){ -// -// StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions -// .builder() -// .pollTimeout(Duration.ofSeconds(2)) -// .build(); -// -// StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); -// -// listenerContainer.receive( -// -// StreamOffset.create(streamKey, ReadOffset.lastConsumed()), -// -// new AgentStatusStreamReader( -// REDIS_STREAM_LISTENER_CONSUMER_NAME, -// REDIS_STREAM_LISTENER_CONSUMER_NAME, -// REDIS_STREAM_LISTENER_CONSUMER_NAME) -// -// ); -// -// return listenerContainer; -// } -// -// -//} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index 48dbf4c..038a36a 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -1,92 +1,16 @@ package io.wdd.rpc.execute.service; -import java.util.ArrayList; -import java.util.HashMap; +import io.wdd.rpc.message.OctopusMessage; + import java.util.List; -/** - * 同步命令执行的核心类 - * 需要等待命令执行完毕,完后返回相应的结果 - */ + public interface AsyncExecutionService { - /** - * ------------------------ Sync Command Executor ------------------------------ - */ - ArrayList AsyncSendCommandToAgent(String agentTopicName, List commandList); - - ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList); - - List> AsyncSendCommandToAgent(List agentTopicNameList, String type, List commandList); - - /** - * 调用 单行命令脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param commandList - * @param needResultReplay - * @param futureKey - * @param durationTask - * @return - */ - ArrayList AsyncSendCommandToAgent( - String agentTopicName, - String type, - List commandList, - boolean needResultReplay, - String futureKey, - boolean durationTask - ); - - - /** - * ------------------------------------------------- - */ - - ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> completeCommandList); - - List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask); - - /** - * 通常为 页面定时脚本任务调用 - * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey - */ - List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); - - - /** - * 通常为 页面定时脚本任务调用 - * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey - * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey - */ - List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); - - - ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); - - /** - * 调用 完整脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param commandList - * @param commandListComplete - * @param futureKey - * @param durationTask - * @return resultKey 本次操作在Redis中记录的结果Key - */ - ArrayList AsyncSendCommandToAgentComplete( - String agentTopicName, + List AsyncCallSendCommandToAgent( + List agentTopicNameList, String type, + List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, @@ -94,4 +18,30 @@ public interface AsyncExecutionService { boolean durationTask ); + + /** + * 同步命令调用的方法 + * + * @param agentTopicName + * @param type + * @param funcContent + * @param commandList + * @param commandListComplete + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + OctopusMessage AsyncCallSendCommandToAgent( + String agentTopicName, + String type, + List funcContent, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 9044bd2..1a31ca6 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -1,256 +1,152 @@ package io.wdd.rpc.execute.service; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.execute.ExecutionMessage; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; +import io.wdd.rpc.message.sender.OMessageToAgentSender; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -@Service +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; + + +//@Service @Slf4j public class AsyncExecutionServiceImpl implements AsyncExecutionService { - private static final boolean COMMAND_EXEC_NEED_REPLAY = true; - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; + private static final String MANUAL_COMMAND_TYPE = "manual-command"; @Resource - AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; + OMessageToAgentSender oMessageToAgentSender; @Resource - SyncExecutionService asyncExecutionService; + ObjectMapper objectMapper; + @Resource + RedisTemplate redisTemplate; - /** - * 一个命令执行的最长等待时间 - */ - int processMaxWaitSeconds = 10; @Override - public ArrayList AsyncSendCommandToAgent(String agentTopicName, List commandList) { - - return this.AsyncSendCommandToAgentComplete( - agentTopicName, - null, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ); - } - - @Override - public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList) { - - - return this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ); - } - - @Override - public List> AsyncSendCommandToAgent(List agentTopicNameList, String type, List commandList) { - + public List AsyncCallSendCommandToAgent(List agentTopicNameList, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { return agentTopicNameList .stream() .map( - agentTopicName -> this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ) + agentTopicName -> { + return this.AsyncCallSendCommandToAgent( + agentTopicName, + type, + funcContent, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + + ); + } ) .collect(Collectors.toList()); } @Override - public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - return this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - null, - COMMAND_EXEC_NEED_REPLAY, - futureKey, - false - ); - } - - @Override - public ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> completeCommandList) { - return this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ); - } - - @Override - public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList, boolean isDurationTask) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - null, - isDurationTask - ) - ) - .collect(Collectors.toList()); - } - - @Override - public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { - - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - null, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - null, - false - ) - ) - .collect(Collectors.toList()); - - } - - @Override - public List> AsyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - null, - completeCommandList, - COMMAND_EXEC_NEED_REPLAY, - atnFutureKey.get(agentTopicName), - false - ) - ) - .collect(Collectors.toList()); - } - - @Override - public ArrayList AsyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { - return this.AsyncSendCommandToAgentComplete( - agentTopicName, - type, - commandList, - commandListComplete, - COMMAND_EXEC_NEED_REPLAY, - futureKey, - false - ); - } - - @Override - public ArrayList AsyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - - OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - needResultReplay, - futureKey, - durationTask - ); - - LocalDateTime initTime = octopusMessage.getInit_time(); - - // OM 中的result保存 - ArrayList result = new ArrayList<>(); - - // 构造消息等待对象 - int commandCount = 1; - if (null != commandListComplete) { - commandCount = Math.max( - commandListComplete.size(), - 1 + // 检查agentTopicName是否存在 + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { + log.error( + "agentTopicName异常! 输入为 => {}", + agentTopicName ); + return null; + //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); } - // 构造回复信息的内容 - OctopusMessageSynScReplayContend executionReplayContent = OctopusMessageSynScReplayContend.build( - commandCount, - CurrentAppOctopusMessageType, - initTime + // 归一化type + if (StringUtils.isEmpty(type)) { + type = MANUAL_COMMAND_TYPE; + } + + String resultKey = futureKey; + // 判定是否是 FutureKey + if (null == futureKey) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } + + // 构造 Execution Command对应的消息体 + ExecutionMessage executionMessage = this + .generateExecutionMessage( + type, + commandList, + resultKey, + commandListComplete, + needResultReplay, + durationTask + ); + OctopusMessage octopusMessage = this.generateOctopusMessage( + agentTopicName, + executionMessage ); - CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch(); - // 开始等待结果 - asyncWaitOctopusMessageResultService.waitFor(executionReplayContent); + // send the message + oMessageToAgentSender.send(octopusMessage); + + // set up the stream read group + String group = redisTemplate + .opsForStream() + .createGroup( + resultKey, + resultKey + ); + + log.debug( + "set consumer group [{}] for the stream key with => [ {} ]", + group, + resultKey + ); + + // help gc + executionMessage = null; + + return octopusMessage; + } + + private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { - // 监听结果 try { - boolean await = countDownLatch.await( - processMaxWaitSeconds, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - - // 等待所有的结果返回 - // 停止等待结果 - asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent); - - // 解析结果 - executionReplayContent - .getReplayOMList() - .stream() - .map( - om -> { - log.debug( - "replay message is => {}", - om - ); - - return (ArrayList) om.getResult(); - } + return OctopusMessage + .builder() + .octopusMessageType(OctopusMessageType.EXECUTOR) + .init_time(TimeUtils.currentFormatTime()) + .uuid(agentTopicName) + .content( + objectMapper.writeValueAsString(executionMessage) ) - .forEachOrdered( - singleResult -> result.addAll(singleResult) - ); + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } - // 返回 执行的结果 - return result; } + + private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { + + return ExecutionMessage + .builder() + .resultKey(resultKey) + .type(type) + .singleLineCommand(commandList) + .multiLineCommand(commandListComplete) + .needResultReplay(needResultReplay) + .durationTask(durationTask) + .build(); + } + + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java index fc52137..526015b 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java @@ -1,82 +1,18 @@ package io.wdd.rpc.execute.service; -import io.wdd.rpc.message.OctopusMessage; - -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; - +/** + * 同步命令执行的核心类 + * 需要等待命令执行完毕,完后返回相应的结果 + */ public interface SyncExecutionService { - String SyncSendCommandToAgent(String agentTopicName, String command); - - String SyncSendCommandToAgent(String agentTopicName, List commandList); - - String SyncSendCommandToAgent(String agentTopicName, String type, List commandList); - - List SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask); - - /** - * 调用 单行命令脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param commandList - * @param needResultReplay - * @param futureKey - * @param durationTask - * @return - */ - String SyncSendCommandToAgent( - String agentTopicName, - String type, - List commandList, - boolean needResultReplay, - String futureKey, - boolean durationTask - ); - - - /** - * ------------------------------------------------- - */ - - String SyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete); - - - List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask); - - - List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); - - /** - * 通常为 页面定时脚本任务调用 - * - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey - * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey - */ - List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); - - - String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); - - /** - * 调用 完整脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param commandList - * @param commandListComplete - * @param futureKey - * @param durationTask - * @return resultKey 本次操作在Redis中记录的结果Key - */ - String SyncSendCommandToAgent( - String agentTopicName, + List> SyncSendCommandToAgentComplete( + List agentTopicNameList, String type, + List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, @@ -86,20 +22,21 @@ public interface SyncExecutionService { /** - * 同步命令调用的方法 + * 调用 完整脚本的 最底层函数 * * @param agentTopicName * @param type + * @param funcContent * @param commandList * @param commandListComplete - * @param needResultReplay * @param futureKey * @param durationTask - * @return + * @return resultKey 本次操作在Redis中记录的结果Key */ - OctopusMessage AsyncCallSendCommandToAgent( + ArrayList SyncSendCommandToAgentComplete( String agentTopicName, String type, + List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index 23ccbd4..2ebaf43 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -1,370 +1,135 @@ package io.wdd.rpc.execute.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.execute.ExecutionMessage; -import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; +import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; - - -@Service +//@Service @Slf4j public class SyncExecutionServiceImpl implements SyncExecutionService { - private static final String MANUAL_COMMAND_TYPE = "manual-command"; + private static final boolean COMMAND_EXEC_NEED_REPLAY = true; + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; @Resource - OMessageToAgentSender oMessageToAgentSender; + AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; @Resource - ObjectMapper objectMapper; - @Resource - RedisTemplate redisTemplate; - - @Override - public String SyncSendCommandToAgent(String agentTopicName, String command) { - return this.SyncSendCommandToAgent( - agentTopicName, - List.of(command) - ); - } - - @Override - public String SyncSendCommandToAgent(String agentTopicName, List commandList) { - return this.SyncSendCommandToAgent( - agentTopicName, - MANUAL_COMMAND_TYPE, - commandList - ); - } - - @Override - public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList) { - - return SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - false, - null, - false - ); - - } - - @Override - public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { - - return this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - null, - needResultReplay, - futureKey, - durationTask - ); - } - - @Override - public String SyncSendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete) { - - return this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - false, - null, - false - ); - } - - @Override - public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete, boolean isDurationTask) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - false, - null, - isDurationTask - ) - ) - .collect(Collectors.toList()); - } - - @Override - public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { - - return this.SyncSendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - false, - futureKey, - false - ); - - } - - @Override - public String SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - - String resultKey = futureKey; - // 判定是否是 FutureKey - if (null == futureKey) { - resultKey = ExecutionMessage.GetResultKey(agentTopicName); - } - - // 调用最底层的方法 - this.AsyncCallSendCommandToAgent( - agentTopicName, - type, - commandList, - commandListComplete, - needResultReplay, - futureKey, - durationTask - ); - - return resultKey; - } - - @Override - public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - - // 检查agentTopicName是否存在 - if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { - log.error( - "agentTopicName异常! 输入为 => {}", - agentTopicName - ); - return null; - //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); - } - - // 归一化type - if (StringUtils.isEmpty(type)) { - type = MANUAL_COMMAND_TYPE; - } - - String resultKey = futureKey; - // 判定是否是 FutureKey - if (null == futureKey) { - resultKey = ExecutionMessage.GetResultKey(agentTopicName); - } - - // 构造 Execution Command对应的消息体 - ExecutionMessage executionMessage = this - .generateExecutionMessage( - type, - commandList, - resultKey, - commandListComplete, - needResultReplay, - durationTask - ); - OctopusMessage octopusMessage = this.generateOctopusMessage( - agentTopicName, - executionMessage - ); - - // send the message - oMessageToAgentSender.send(octopusMessage); - - // set up the stream read group - String group = redisTemplate - .opsForStream() - .createGroup( - resultKey, - resultKey - ); - - log.debug( - "set consumer group [{}] for the stream key with => [ {} ]", - group, - resultKey - ); - - // change the redis stream listener container - // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); - - // construct the persistent Bean - /*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage( - octopusMessage, - executionMessage - );*/ - // send resultKey to ExecutionResultDaemonHandler - // 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效 - /*WAIT_EXECUTION_RESULT_LIST.put( - resultKey, - executionLog - );*/ - - // help gc - executionMessage = null; - - return octopusMessage; - } - - private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { - - try { - - return OctopusMessage - .builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(TimeUtils.currentFormatTime()) - .uuid(agentTopicName) - .content( - objectMapper.writeValueAsString(executionMessage) - ) - .build(); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - - private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { - ExecutionLog executionLog = new ExecutionLog(); - executionLog.setAgentTopicName(octopusMessage.getUuid()); - executionLog.setResultKey((String) octopusMessage.getContent()); - executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand())); - executionLog.setType(executionMessage.getType()); - executionLog.setResultKey(executionMessage.getResultKey()); - return executionLog; - } - - - @Override - public List SyncSendCommandToAgent(List agentagentTopicNameList, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { - - return agentagentTopicNameList - .stream() - .map( - agentTopicName -> this - .SyncSendCommandToAgent - ( - agentTopicName, - type, - commandList, - null, - needResultReplay, - futureKey, - durationTask - ) - ) - .collect(Collectors.toList()); - } + AsyncExecutionService asyncExecutionService; /** - * @param agentTopicNameList 目标Agent的TopicName列表 - * @param type 任务类型 - * @param completeCommandList 完整的类型 - * @return + * 一个命令执行的最长等待时间 */ + int processMaxWaitSeconds = 10; + @Override - public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { return agentTopicNameList .stream() .map( - agentTopicName -> this.SyncSendCommandToAgentComplete( - agentTopicName, - type, - null, - completeCommandList - ) + agentTopicName -> { + return this.SyncSendCommandToAgentComplete( + agentTopicName, + type, + null, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + ); + } ) .collect(Collectors.toList()); } @Override - public List SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + public ArrayList SyncSendCommandToAgentComplete(String agentTopicName, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - return agentTopicNameList - .stream() - .map( - agentTopicName -> this.SyncSendCommandToAgent( - agentTopicName, - type, - null, - completeCommandList, - atnFutureKey.getOrDefault( - agentTopicName, - null - ) - ) - ) - .collect(Collectors.toList()); - } - - - @Deprecated - private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List commandList, List> commandListComplete) { - - - ExecutionMessage executionMessage = this.generateExecutionMessage( + // 异步访问 + OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( + agentTopicName, type, + null, commandList, - resultKey, commandListComplete, - false, - false + needResultReplay, + futureKey, + durationTask ); - String executionMessageString; + LocalDateTime initTime = octopusMessage.getInit_time(); - try { - executionMessageString = objectMapper.writeValueAsString(executionMessage); + // OM 中的result保存 + ArrayList result = new ArrayList<>(); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); + // 构造消息等待对象 + int commandCount = 1; + if (null != commandListComplete) { + commandCount = Math.max( + commandListComplete.size(), + 1 + ); } - return OctopusMessage - .builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) - .content(executionMessageString) - .uuid(agentTopicName) - .build(); + // 构造回复信息的内容 + OctopusMessageSyncReplayContend executionReplayContent = OctopusMessageSyncReplayContend.build( + commandCount, + CurrentAppOctopusMessageType, + initTime + ); + CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch(); + + // 开始等待结果 + asyncWaitOctopusMessageResultService.waitFor(executionReplayContent); + + // 监听结果 + try { + boolean await = countDownLatch.await( + processMaxWaitSeconds, + TimeUnit.SECONDS + ); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + // 等待所有的结果返回 + // 停止等待结果 + asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent); + + // 解析结果 + executionReplayContent + .getReplayOMList() + .stream() + .map( + om -> { + log.debug( + "replay message is => {}", + om + ); + + return (ArrayList) om.getResult(); + } + ) + .forEachOrdered( + singleResult -> result.addAll(singleResult) + ); + + } + + // 返回 执行的结果 + return result; } - - private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey, List> commandListComplete, boolean needResultReplay, boolean durationTask) { - - return ExecutionMessage - .builder() - .resultKey(resultKey) - .type(type) - .singleLineCommand(commandList) - .multiLineCommand(commandListComplete) - .needResultReplay(needResultReplay) - .durationTask(durationTask) - .build(); - } - - } diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index bb8241e..22f7cce 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -244,7 +244,7 @@ public class AcceptAgentInitInfo { OctopusMessage octopusMessage = OctopusMessage .builder() - .type(OctopusMessageType.INIT) + .octopusMessageType(OctopusMessageType.INIT) // should be the OctopusExchange Name .content(serverInfoContent) .init_time(TimeUtils.currentFormatTime()) diff --git a/server/src/main/java/io/wdd/rpc/message/OctopusMessage.java b/server/src/main/java/io/wdd/rpc/message/OctopusMessage.java index 3e7d7e6..393ab1f 100644 --- a/server/src/main/java/io/wdd/rpc/message/OctopusMessage.java +++ b/server/src/main/java/io/wdd/rpc/message/OctopusMessage.java @@ -15,32 +15,36 @@ import java.time.LocalDateTime; @SuperBuilder(toBuilder = true) public class OctopusMessage { + /** + * 应该为64位的UUID + */ String uuid; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") LocalDateTime init_time; /** * 执行操作的类型 */ - OctopusMessageType type; + OctopusMessageType octopusMessageType; + // server send message content Object content; - // agent reply message content - Object result; - - - /** - * 执行结果的状态Code - */ - String ResultCode; - /** * Agent 完成操作的时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") LocalDateTime ac_time; + + // agent reply message content + Object result; + + /** + * 执行结果的状态Code + */ + String resultCode; } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java index 1be1284..d516908 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java @@ -3,14 +3,14 @@ package io.wdd.rpc.message.handler.async; import io.wdd.rpc.message.OctopusMessage; import io.wdd.server.config.ServerCommonPool; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey; +import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; /** * 从Agent收集返回信息的统一处理地点 @@ -18,41 +18,10 @@ import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESS *

* 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容 */ -@Service +//@Service @Slf4j public class AsyncWaitOctopusMessageResultService { - /** - * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 - * KEY -> replayMatchKey - * VALUE -> OctopusMessageSynScReplayContend - 包含countDownLatch 和 result - */ - private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); - - public void waitFor(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) { - - // 向 REPLAY_CACHE_MAP中写入 Key - OM_REPLAY_WAITING_TARGET_MAP.put( - OctopusMessageSynScReplayContend.getReplayMatchKey(), - OctopusMessageSynScReplayContend - ); - - // 在调用线程的countDownLunch结束之后,关闭 - // 清除 REPLAY_CACHE_MAP 中的队列 - } - - public void stopWaiting(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) { - - // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - OctopusMessageSynScReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSynScReplayContend.getReplayMatchKey()); - - // 移除该内容 - OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSynScReplayContend.getReplayMatchKey()); - - // help gc - contend = null; - - } @PostConstruct public void daemonHandleReplayOMFromAgent() { @@ -65,6 +34,13 @@ public class AsyncWaitOctopusMessageResultService { } + /** + * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 + * KEY -> replayMatchKey + * VALUE -> OctopusMessageSyncReplayContend - 包含countDownLatch 和 result + */ + private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); + /** * 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中 */ @@ -88,8 +64,8 @@ public class AsyncWaitOctopusMessageResultService { OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 构造 replayMatchKey - String matchKey = OctopusMessageSynScReplayContend.generateMatchKey( - replayOMessage.getType(), + String matchKey = GenerateOMessageMatchKey( + replayOMessage.getOctopusMessageType(), replayOMessage.getInit_time() ); if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) { @@ -97,7 +73,7 @@ public class AsyncWaitOctopusMessageResultService { // todo 错误的数据需要放置于某处 log.debug( - "等待队列力没有该回复的结果key =>", + "等待队列里面没有该回复的结果key =>", matchKey ); @@ -105,12 +81,12 @@ public class AsyncWaitOctopusMessageResultService { } // Map中包含有Key,那么放置进去 - OctopusMessageSynScReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); + OctopusMessageSyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); + replayContend .getReplayOMList() .add(replayOMessage); - // 需要操作countDown replayContend .getCountDownLatch() @@ -119,6 +95,30 @@ public class AsyncWaitOctopusMessageResultService { // 结束操作,继续循环 } + } + + public void waitFor(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) { + + // 向 REPLAY_CACHE_MAP中写入 Key + OM_REPLAY_WAITING_TARGET_MAP.put( + OctopusMessageSyncReplayContend.getReplayMatchKey(), + OctopusMessageSyncReplayContend + ); + + // 在调用线程的countDownLunch结束之后,关闭 + // 清除 REPLAY_CACHE_MAP 中的队列 + } + + public void stopWaiting(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) { + + // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 + OctopusMessageSyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSyncReplayContend.getReplayMatchKey()); + + // 移除该内容 + OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSyncReplayContend.getReplayMatchKey()); + + // help gc + contend = null; } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java similarity index 72% rename from server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java index 25ddc4e..f2ad53f 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java @@ -14,12 +14,14 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; +import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey; + @Data @AllArgsConstructor @NoArgsConstructor @SuperBuilder(toBuilder = true) @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") -public class OctopusMessageSynScReplayContend { +public class OctopusMessageSyncReplayContend { @ApiModelProperty("rpc消息的类型") OctopusMessageType type; @@ -37,34 +39,23 @@ public class OctopusMessageSynScReplayContend { @ApiModelProperty("回复的结果列表, 临时保存") ArrayList replayOMList; - /** - * @param messageType - * @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); - * @return - */ - public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { - - String relayMatchKey = messageType.toString() + messageInitTime.toString(); - - return relayMatchKey; - } /** * Execution模块使用的模板 * * @return */ - public static OctopusMessageSynScReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + public static OctopusMessageSyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { CountDownLatch latch = null; if (waitForReplayNum != 0) { latch = new CountDownLatch(waitForReplayNum); } - return new OctopusMessageSynScReplayContend( + return new OctopusMessageSyncReplayContend( currentOMType, currentTime, - generateMatchKey( + GenerateOMessageMatchKey( currentOMType, currentTime ), diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java new file mode 100644 index 0000000..5168053 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java @@ -0,0 +1,124 @@ +package io.wdd.rpc.message.handler.sync; + +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.OctopusMessageType; +import io.wdd.server.config.ServerCommonPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.time.LocalDateTime; +import java.util.concurrent.CompletableFuture; + +import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP; +import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; + +@Service +@Slf4j(topic = "Octopus Message Handler") +public class OMessageHandler { + + + /** + * 创建 发送和接收 OctopusMessage之间的比对关系 + * + * @param messageType + * @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); + * @return + */ + public static String GenerateOMessageMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { + + String relayMatchKey = messageType.toString() + messageInitTime.toString(); + + return relayMatchKey; + } + + @PostConstruct + public void daemonHandleReplayOMFromAgent() { + + // 异步任务启动 + CompletableFuture.runAsync( + () -> doHandleOMessageFromAgent(), + ServerCommonPool.pool + ); + + } + + /** + * 解析所有从Agent传回的消息,中央集中化处理 + */ + private void doHandleOMessageFromAgent() { + + // 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT + while (true) { + + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + + try { + OCTOPUS_MESSAGE_FROM_AGENT.wait(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + // 拿到消息 + OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 构造 replayMatchKey + String matchKey = GenerateOMessageMatchKey( + replayOMessage.getOctopusMessageType(), + replayOMessage.getInit_time() + ); + if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) { + // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 + // todo 错误的数据需要放置于某处 + log.debug( + "等待队列里面没有该回复的结果key =>", + matchKey + ); + + continue; + } + + // 归还信息 + // 拿到原始信息 + OctopusMessage originOMessage = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey); + originOMessage.setResultCode(replayOMessage.getResultCode()); + originOMessage.setResult(replayOMessage.getResult()); + + + // 通知等待线程 + originOMessage.notify(); + } + + } + + protected void waitFor(OctopusMessage octopusMessage) { + + // 构建 MatchKey + String matchKey = GenerateOMessageMatchKey( + octopusMessage.getOctopusMessageType(), + octopusMessage.getInit_time() + ); + + // 开始等待 + FROM_AGENT_MATCH_TO_AGENT_MAP.put( + matchKey, + octopusMessage + ); + } + + public void stopWaiting(OctopusMessage octopusMessage) { + + // 构建 MatchKey + String matchKey = GenerateOMessageMatchKey( + octopusMessage.getOctopusMessageType(), + octopusMessage.getInit_time() + ); + + // 开始等待 + FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey); + + } +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java similarity index 77% rename from server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java rename to server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java index 2fc6fde..b681e99 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java @@ -1,7 +1,6 @@ package io.wdd.rpc.message.handler.sync; -import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.message.OctopusMessage; import lombok.extern.slf4j.Slf4j; @@ -14,16 +13,20 @@ import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayDeque; +import java.util.HashMap; + +import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; @Configuration -@Slf4j(topic = "Octopus Message Handler") -public class OMessageHandlerServer { +@Slf4j(topic = "Octopus Message Listener") +public class OMessageToServerListener { /** * Redis Key 用于保存Agent的最新版本 * 由 GitHubAction发送至 RabbitMQ中,然后此处获取处理,发送至Redis中 */ public static final String LATEST_VERSION = "LATEST_VERSION"; + /** * 存储所有的从 Agent过来的 OctopusMessage * 各个业务模块需要自己手动去获取自己需要的内容 @@ -32,10 +35,20 @@ public class OMessageHandlerServer { public static ArrayDeque OCTOPUS_MESSAGE_FROM_AGENT = new ArrayDeque<>( 128 ); + + /** + * 发送出去的OctopusMessage需要和返回回来的内容对比 + * 返回来的OM反序列化之后就不是原对象,需要进行 通过MatchKey比较 + *

+ * omMatchKey -- OctopusMessage + */ + public static HashMap FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>(); + @Resource RedisTemplate redisTemplate; + @Resource - ObjectMapper objectMapper; + OMessageHandler oMessageHandler; @RabbitHandler @RabbitListener(queues = "${octopus.message.octopus_to_server}" @@ -45,7 +58,7 @@ public class OMessageHandlerServer { OctopusMessage octopusMessage; try { - octopusMessage = objectMapper.readValue( + octopusMessage = OctopusObjectMapper.readValue( message.getBody(), OctopusMessage.class ); @@ -70,6 +83,7 @@ public class OMessageHandlerServer { "开始向Redis中缓存Agent的最新版本 => {}", latestVersion ); + redisTemplate .opsForValue() .set( @@ -82,11 +96,9 @@ public class OMessageHandlerServer { // 将收到的消息,直接存储到 缓存队列中 log.debug("cache the octopus message to inner cache list !"); OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage); + oMessageHandler.waitFor(octopusMessage); - // collect all message from agent and log to somewhere - - // 1. send some info to the specific topic name - // 2. judge from which agent the message are - // + // 唤醒等待线程 + OCTOPUS_MESSAGE_FROM_AGENT.notify(); } } diff --git a/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java b/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java index 56dd6e6..aa5c86a 100644 --- a/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java +++ b/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java @@ -1,6 +1,7 @@ package io.wdd.rpc.message.sender; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.init.InitRabbitMQConfig; @@ -14,6 +15,8 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; +import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; + /** * adaptor * provide override method to convert Object and send to rabbitmq @@ -34,25 +37,38 @@ public class OMessageToAgentSender { /** * send to Queue -- InitFromServer * - * @param message octopus message + * @param octopusMessage octopus octopusMessage */ - public void sendINIT(OctopusMessage message) { + public void sendINIT(OctopusMessage octopusMessage) { - // only accept INIT type message - if (!OctopusMessageType.INIT.equals(message.getType())) { + // only accept INIT type octopusMessage + if (!OctopusMessageType.INIT.equals(octopusMessage.getOctopusMessageType())) { throw new MyRuntimeException("To Agent Order method usage wrong !"); } // send to Queue -- InitFromServer log.info( "send INIT OrderCommand to Agent = {}", - message + octopusMessage ); + // 统一处理Content + if (octopusMessage.getContent() instanceof String) { + try { + + String contendString = OctopusObjectMapper.writeValueAsString(octopusMessage.getContent()); + + octopusMessage.setContent(contendString); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + rabbitTemplate.convertAndSend( initRabbitMQConfig.INIT_EXCHANGE, initRabbitMQConfig.INIT_FROM_SERVER_KEY, - writeData(message) + writeData(octopusMessage) ); } @@ -66,6 +82,19 @@ public class OMessageToAgentSender { octopusMessage.getUuid() ); + // 统一处理Content + if (!(octopusMessage.getContent() instanceof String)) { + try { + + String contendString = OctopusObjectMapper.writeValueAsString(octopusMessage.getContent()); + + octopusMessage.setContent(contendString); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + rabbitTemplate.convertAndSend( initRabbitMQConfig.OCTOPUS_EXCHANGE, octopusMessage.getUuid() + "*", diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java index fba9ab6..53d9a8d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java @@ -1,32 +1,32 @@ -package io.wdd.rpc.scheduler.job; - -import io.wdd.rpc.scheduler.config.QuartzLogOperator; -import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.springframework.scheduling.quartz.QuartzJobBean; - -import javax.annotation.Resource; - -public class AgentAliveStatusMonitorJob extends QuartzJobBean { - - @Resource - AgentAliveStatusMonitorService agentAliveStatusMonitorService; - - @Resource - QuartzLogOperator quartzLogOperator; - - @Override - protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { - - // get the jobMetaMap - //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); - - // actually execute the monitor service - agentAliveStatusMonitorService.collectAllAgentAliveStatus(); - - // log to somewhere - quartzLogOperator.save(); - - } -} +//package io.wdd.rpc.scheduler.job; +// +//import io.wdd.rpc.scheduler.config.QuartzLogOperator; +//import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; +//import org.quartz.JobExecutionContext; +//import org.quartz.JobExecutionException; +//import org.springframework.scheduling.quartz.QuartzJobBean; +// +//import javax.annotation.Resource; +// +//public class AgentAliveStatusMonitorJob extends QuartzJobBean { +// +// @Resource +// AgentAliveStatusMonitorService agentAliveStatusMonitorService; +// +// @Resource +// QuartzLogOperator quartzLogOperator; +// +// @Override +// protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { +// +// // get the jobMetaMap +// //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); +// +// // actually execute the monitor service +// agentAliveStatusMonitorService.collectAllAgentAliveStatus(); +// +// // log to somewhere +// quartzLogOperator.save(); +// +// } +//} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java index 1c89f42..8bb8e05 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java @@ -1,29 +1,21 @@ package io.wdd.rpc.scheduler.job; -import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.springframework.scheduling.quartz.QuartzJobBean; - -import javax.annotation.Resource; - - -public class AgentMetricStatusJob extends QuartzJobBean { - - @Resource - AgentMetricStatusCollectService agentMetricStatusCollectService; - - @Override - protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { - - // 从JobDetailContext中获取相应的信息 -// JobDataMap jobDataMap = jobExecutionContext -// .getJobDetail() -// .getJobDataMap(); - - // 执行Agent Metric 状态收集任务 - agentMetricStatusCollectService.collectHealthyAgentMetric(); - - } - -} +//public class AgentMetricStatusJob extends QuartzJobBean { +// +// @Resource +// AgentMetricStatusCollectService agentMetricStatusCollectService; +// +// @Override +// protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { +// +// // 从JobDetailContext中获取相应的信息 +//// JobDataMap jobDataMap = jobExecutionContext +//// .getJobDetail() +//// .getJobDataMap(); +// +// // 执行Agent Metric 状态收集任务 +// agentMetricStatusCollectService.collectHealthyAgentMetric(); +// +// } +// +//} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index d4c1a4e..513341c 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -1,17 +1,14 @@ package io.wdd.rpc.scheduler.service; -import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob; -import io.wdd.rpc.scheduler.job.AgentMetricStatusJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; -@Component +//@Component @Slf4j public class BuildStatusScheduleTask { @@ -31,10 +28,10 @@ public class BuildStatusScheduleTask { private void buildAllPreScheduledTask() { // Agent存活健康状态检查 - buildMonitorAllAgentAliveStatusScheduleTask(); +// buildMonitorAllAgentAliveStatusScheduleTask(); // Agent运行信息检查 Metric - buildAgentMetricScheduleTask(); +// buildAgentMetricScheduleTask(); } @@ -45,43 +42,19 @@ public class BuildStatusScheduleTask { *

* 2023年7月10日 更改为按照cron表达式进行执行 */ - public void buildAgentMetricScheduleTask() { - - // 2023年7月10日 更改为按照cron表达式进行执行 - octopusQuartzService.addMission( - AgentMetricStatusJob.class, - "agentRunMetricStatusJob", - JOB_GROUP_NAME, - metricReportStartDelaySeconds, - metricReportCronExpress, - null - ); - - // 计算 Metric检测的时间间隔 - /*int metricReportTimesCount = 19; - try { - CronExpression cronExpression = new CronExpression(healthyCronTimeExpress); - - Date now = new Date(); - Date nextValidTime = cronExpression.getNextValidTimeAfter(now); - long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000; - metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1; - - *//*System.out.println("totalSeconds = " + totalSeconds); - System.out.println("metricReportTimesCount = " + metricReportTimesCount);*//* - - } catch (ParseException e) { - throw new RuntimeException(e); - } - - HashMap map = new HashMap(); - map.put(METRIC_REPORT_TIME_PINCH, metricReportTimePinch); - map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount);*/ - - // - - - } +// public void buildAgentMetricScheduleTask() { +// +// // 2023年7月10日 更改为按照cron表达式进行执行 +// octopusQuartzService.addMission( +// AgentMetricStatusJob.class, +// "agentRunMetricStatusJob", +// JOB_GROUP_NAME, +// metricReportStartDelaySeconds, +// metricReportCronExpress, +// null +// ); +// +// } /** * Agent存活健康状态检查 @@ -90,19 +63,19 @@ public class BuildStatusScheduleTask { * 延迟触发时间 healthyCheckStartDelaySeconds * 定时任务间隔 healthyCronTimeExpress */ - private void buildMonitorAllAgentAliveStatusScheduleTask() { - - // build the Job - octopusQuartzService.addMission( - AgentAliveStatusMonitorJob.class, - "monitorAllAgentAliveStatusJob", - JOB_GROUP_NAME, - healthyCheckStartDelaySeconds, - healthyCronTimeExpress, - null - ); - - } +// private void buildMonitorAllAgentAliveStatusScheduleTask() { +// +// // build the Job +// octopusQuartzService.addMission( +// AgentAliveStatusMonitorJob.class, +// "monitorAllAgentAliveStatusJob", +// JOB_GROUP_NAME, +// healthyCheckStartDelaySeconds, +// healthyCronTimeExpress, +// null +// ); +// +// } } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 4e084ac..5d38ee5 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -1,27 +1,29 @@ package io.wdd.rpc.scheduler.service.script; -import io.wdd.rpc.execute.service.SyncExecutionService; +import io.wdd.rpc.execute.ExecutionMessage; +import io.wdd.rpc.execute.service.AsyncExecutionService; +import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; +import java.util.stream.Collectors; /** * 1. */ -@Service +//@Service @Slf4j @Deprecated public class AgentApplyScheduledScript { @Resource - SyncExecutionService asyncExecutionService; + AsyncExecutionService asyncExecutionService; @Resource QuartzSchedulerUtils quartzSchedulerUtils; @@ -46,13 +48,35 @@ public class AgentApplyScheduledScript { } // 发送命令到Agent中 - List resultKeyList = asyncExecutionService - .SyncSendCommandToAgentComplete( - targetMachineList, - scriptType, - completeCommandList, - futureResultKeyMap - ); + String finalScriptType = scriptType; + List resultKeyList = targetMachineList + .stream() + .map( + targetMachine -> { + OctopusMessage octopusMessage = asyncExecutionService + .AsyncCallSendCommandToAgent( + targetMachine, + finalScriptType, + null, + null, + completeCommandList, + false, + null, + false + ); + + String resultKey = ((ExecutionMessage) octopusMessage.getContent()).getResultKey(); + + // 构建Map + futureResultKeyMap.put( + targetMachine, + resultKey + ); + + return resultKey; + } + ) + .collect(Collectors.toList()); // 将 resultKeyList 放入这个DTO中 scriptSchedulerDTO.setResultKeyList(resultKeyList); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java index 1b32a5f..7640181 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java @@ -7,7 +7,6 @@ import io.wdd.rpc.status.beans.AgentStatus; import io.wdd.rpc.status.service.SyncStatusService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; @@ -19,7 +18,7 @@ import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAM /** * 定时任务 收集Agent的运行Metric的实际执行类 */ -@Service +//@Service @Slf4j public class AgentMetricStatusCollectService { diff --git a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java index 70e9d8f..3d034b8 100644 --- a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java +++ b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java @@ -51,7 +51,7 @@ public class OctopusStatusMessage { return OctopusMessage .builder() - .type(OctopusMessageType.STATUS) + .octopusMessageType(OctopusMessageType.STATUS) .uuid(agentTopicName) .init_time(currentTime) .content(ops) diff --git a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java index c80cfa2..e93840b 100644 --- a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java @@ -6,12 +6,11 @@ import io.wdd.rpc.beans.request.MetricQueryEntity; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; +import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.status.beans.AgentStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; @@ -27,7 +26,7 @@ import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; import static io.wdd.rpc.status.OctopusStatusMessage.*; @Slf4j -@Service +//@Service public class SyncStatusServiceImpl implements SyncStatusService { private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; @@ -64,7 +63,7 @@ public class SyncStatusServiceImpl implements SyncStatusService { ); // 同步收集消息 - OctopusMessageSynScReplayContend statusSyncReplayContend = OctopusMessageSynScReplayContend.build( + OctopusMessageSyncReplayContend statusSyncReplayContend = OctopusMessageSyncReplayContend.build( agentTopicNameList.size(), CurrentAppOctopusMessageType, currentTime @@ -128,7 +127,7 @@ public class SyncStatusServiceImpl implements SyncStatusService { ); // 同步等待结果, 并且解析结果 - OctopusMessageSynScReplayContend metricSyncReplayContend = OctopusMessageSynScReplayContend.build( + OctopusMessageSyncReplayContend metricSyncReplayContend = OctopusMessageSyncReplayContend.build( agentTopicNameList.size(), CurrentAppOctopusMessageType, currentTime diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index 6a58e50..ad8ab21 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -83,14 +83,16 @@ mybatis-plus: banner: false configuration: # 希望知道所有的sql是怎么执行的, 配置输出日志 - #log-impl: org.apache.ibatis.logging.stdout.StdOutImpl - log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl + log-impl: org.apache.ibatis.logging.stdout.StdOutImpl + # log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl # 数据库下划线--实体类也是下划线 需要为false map-underscore-to-camel-case: true # 一级缓存的 缓存级别默认为 session,如果要关闭一级缓存可以设置为 statement local-cache-scope: session # 是否开启二级缓存 cache-enabled: false + # 分页插件配置 + interceptor: com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor # 默认地址为 classpath*:/mapper/**/*.xml # mapper-locations: classpath*:/real-mappers/**/*.xml @@ -162,6 +164,6 @@ oss: # 开启debug模式 logging: level: - io.wdd: debug + io.wdd.rpc.execute: debug debug: true diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index bfdd25a..8851cba 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -1,6 +1,6 @@ package io.wdd.server; -import io.wdd.rpc.execute.service.SyncExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,7 +13,7 @@ class ServerApplicationTests { @Resource - SyncExecutionService asyncExecutionService; + AsyncExecutionService asyncExecutionService; @Test void testCoreExecutionCompleteScript() { diff --git a/server/src/test/java/io/wdd/server/SimpleTest.java b/server/src/test/java/io/wdd/server/SimpleTest.java index 2955b56..0535af5 100644 --- a/server/src/test/java/io/wdd/server/SimpleTest.java +++ b/server/src/test/java/io/wdd/server/SimpleTest.java @@ -1,10 +1,5 @@ package io.wdd.server; -import io.wdd.rpc.status.AgentHealthyStatusEnum; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.concurrent.CompletableFuture; @@ -13,33 +8,6 @@ public class SimpleTest { public static void main(String[] args) { - HashMap map = new HashMap<>(); - - HashMap> hashMap = new HashMap<>(); - - hashMap.put( - "HEALTHY", - new ArrayList<>( - List.of( - "Tokyo-amd64-07-f66a41", - "Tokyo-amd64-03-dc543f" - ) - ) - ); - - hashMap - .get(AgentHealthyStatusEnum.FAILED.getStatus()) - .stream() - .forEach( - agentTopicName -> { - map.put( - agentTopicName, - "0" - ); - } - ); - - } private void CompletableFutureTest() {