From f43d9f84a44c26e4c081f0031c0e35c56fe96c99 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 10:34:48 +0800 Subject: [PATCH 01/16] [agent][init]- fix bugs --- .../bootup/OctopusAgentInitService.java | 8 ++++---- .../message/GenOctopusRabbitMQConnection.java | 7 +++++-- .../common/handler/GlobalExceptionHandler.java | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java index 11d7e7b..985eed0 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java @@ -9,7 +9,6 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.handler.MyRuntimeException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.units.qual.K; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; @@ -88,7 +87,9 @@ public class OctopusAgentInitService { // response chain to handle all kind of type of octopus message if (!octopusMessageHandler.handle(octopusMessage)) { - throw new MyRuntimeException(" Handle Octopus Message Error !"); + String s = "Handle Octopus Message Error !"; + log.error(s); + throw new MyRuntimeException(s); } } catch (Exception e) { @@ -98,8 +99,7 @@ public class OctopusAgentInitService { // long deliveryTag, boolean requeue // channel.basicReject(deliveryTag,true); - log.error("Octopus Agent Initialization Error, please check !"); - log.info("waiting for 5 seconds"); + log.error("Octopus Agent Initialization Error, please check ! Waiting for 5 seconds"); // 这里只是便于出现死循环时查看 TimeUnit.SECONDS.sleep(5); diff --git a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java index b5a74a5..6b8dbd0 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java +++ b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java @@ -49,8 +49,11 @@ public class GenOctopusRabbitMQConnection { // reboot judgement of existing exchange QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName); - if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) { - log.info("Octopus Agent Specific Topic Queue Already Existed ! == {}", agentTopicName); + if (ObjectUtils.isNotEmpty(queueInfo)) { + log.info( + "Octopus Agent Specific Topic Queue Already Existed ! == {}", + agentTopicName + ); return; } diff --git a/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java b/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java index 12fc79d..1ee6acc 100644 --- a/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java +++ b/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java @@ -10,6 +10,7 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.validation.BindException; import org.springframework.validation.FieldError; import org.springframework.validation.ObjectError; +import org.springframework.web.HttpRequestMethodNotSupportedException; import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RestControllerAdvice; @@ -91,6 +92,20 @@ public class GlobalExceptionHandler { return vo; } + /** + * 处理此种异常 + * + * @param httpRequestMethodNotSupportedException + */ + @ExceptionHandler(value = {HttpRequestMethodNotSupportedException.class}) + public void methodNotMatchHandler(HttpRequestMethodNotSupportedException httpRequestMethodNotSupportedException) { + + log.debug( + httpRequestMethodNotSupportedException.getMessage() + ); + + } + /** * 拦截数据库异常 * From dfb468c928c6d966943db643326d9e8bd9423ffc Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 03:07:48 +0000 Subject: [PATCH 02/16] add oracle s5 run config --- .run/Agent-dev-oracle-s5.run.xml | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 .run/Agent-dev-oracle-s5.run.xml diff --git a/.run/Agent-dev-oracle-s5.run.xml b/.run/Agent-dev-oracle-s5.run.xml new file mode 100644 index 0000000..98bef8c --- /dev/null +++ b/.run/Agent-dev-oracle-s5.run.xml @@ -0,0 +1,37 @@ + + + + \ No newline at end of file From b31480ec8cb9d1d4a45ef67e2b3136a24254ec0d Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 11:27:22 +0800 Subject: [PATCH 03/16] [agent][executor]- fix bug --- .../src/main/java/io/wdd/agent/executor/CommandExecutor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index c40814e..b805cb4 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -8,6 +8,7 @@ import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; @@ -33,7 +34,8 @@ public class CommandExecutor { /** * 一个命令执行的最长等待时间 */ - int processMaxWaitSeconds = 10; + @Value("${octopus.agent.executor.processMaxTimeOut}") + Integer processMaxWaitSeconds; /** * 持久化命令执行的最长等待时间 From c51fdaad13130afc915ae76fb418e86927e8ac97 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 11:32:36 +0800 Subject: [PATCH 04/16] [agent][ init]- fix bug --- .../message/GenOctopusRabbitMQConnection.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java index 6b8dbd0..341af25 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java +++ b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java @@ -48,20 +48,22 @@ public class GenOctopusRabbitMQConnection { // reboot judgement of existing exchange QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName); - - if (ObjectUtils.isNotEmpty(queueInfo)) { - log.info( - "Octopus Agent Specific Topic Queue Already Existed ! == {}", - agentTopicName - ); - return; + if (ObjectUtils.isEmpty(queueInfo)) { + log.debug("开始为Agent创建相形的消息队列!"); } - Queue queue = new Queue(agentTopicName, true, false, false); + Queue queue = new Queue( + agentTopicName, + true, + false, + false + ); Binding binding = new Binding( agentTopicName, Binding.DestinationType.QUEUE, - octopusMessage.getContent().toString(), + octopusMessage + .getContent() + .toString(), agentTopicName + "*", null ); @@ -77,8 +79,7 @@ public class GenOctopusRabbitMQConnection { listenerContainer.setMessageListener(this::AgentListenToSpecificTopicOctopusMessage); listenerContainer.start(); - - log.info("Specific Octopus Topic Queue Generate Successfully !"); + log.info("每个Agent特定的Octopus Topic Queue创建成功!"); messageListenerContainerList.add(listenerContainer); } From c36721eada83a521fdbf96a68911f6aa6297874f Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 15:36:53 +0800 Subject: [PATCH 05/16] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=90=8C=E6=AD=A5=E8=B0=83=E7=94=A8=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E7=9A=84=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/wdd/common/utils/TimeUtils.java | 2 +- .../xray/service/XrayConfigDistribute.java | 4 +- .../rpc/agent/OctopusAgentServiceImpl.java | 67 ++--- .../wdd/rpc/controller/AgentController.java | 4 +- .../rpc/controller/ExecutionController.java | 76 ++++-- ...ervice.java => AsyncExecutionService.java} | 33 ++- ...pl.java => AsyncExecutionServiceImpl.java} | 43 +++- .../execute/service/SyncExecutionService.java | 95 +++++++ .../service/SyncExecutionServiceImpl.java | 236 ++++++++++++++++++ .../message/handler/AsyncWaitOMResult.java | 25 +- .../rpc/message/handler/OMReplayContend.java | 29 ++- .../script/AgentApplyScheduledScript.java | 6 +- .../io/wdd/server/ServerApplicationTests.java | 6 +- 13 files changed, 535 insertions(+), 91 deletions(-) rename server/src/main/java/io/wdd/rpc/execute/service/{CoreExecutionService.java => AsyncExecutionService.java} (79%) rename server/src/main/java/io/wdd/rpc/execute/service/{CoreExecutionServiceImpl.java => AsyncExecutionServiceImpl.java} (91%) create mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java diff --git a/common/src/main/java/io/wdd/common/utils/TimeUtils.java b/common/src/main/java/io/wdd/common/utils/TimeUtils.java index f85627b..38d5e53 100644 --- a/common/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/common/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -117,7 +117,7 @@ public class TimeUtils { } /** - * @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String + * @return UTC+8 [ yyyy-MM-dd-HH-mm-ss ] Time String */ public static String currentTimeStringFullSplit() { diff --git a/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java b/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java index c8bf578..2c1a25f 100644 --- a/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java +++ b/server/src/main/java/io/wdd/func/xray/service/XrayConfigDistribute.java @@ -7,7 +7,7 @@ import io.wdd.func.oss.service.OSSCoreService; import io.wdd.func.oss.service.OssBackendSelect; import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.XrayConfigInfo; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -112,7 +112,7 @@ public class XrayConfigDistribute { OSSCoreService ossCoreService; @Resource - CoreExecutionService executionService; + AsyncExecutionService executionService; public void uploadXrayConfigToOSS(ArrayList networkPathList) { diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 6de5c49..408d47a 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -19,7 +19,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP; import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; @@ -67,7 +65,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); - // 发送OctopusMessage-Agent buildOMessageAndSendToAllHealthyAgent( AgentOperationType.VERSION, @@ -75,21 +72,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); - ArrayList replayOMList = new ArrayList<>(); - OMReplayContend omReplayContend = OMReplayContend - .builder() - .initTime(currentTime) - .countDownLatch(countDownLatch) - .replayOMList(replayOMList) - .replayMatchKey( - OMReplayContend.generateMatchKey( - CurrentAppOctopusMessageType, - currentTime - ) - ) - .type(CurrentAppOctopusMessageType) - .build(); + OMReplayContend omReplayContend = OMReplayContend.build( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), + CurrentAppOctopusMessageType, + currentTime + ); + + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + // 调用后台接收处理所有的Replay信息 asyncWaitOMResult.waitFor(omReplayContend); @@ -101,21 +91,24 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { countDownLatch );*/ + + boolean isAllHealthyAgentReport = false; try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 - countDownLatch.await( + isAllHealthyAgentReport = countDownLatch.await( 5, TimeUnit.SECONDS ); } catch (InterruptedException e) { - log.warn("存在部分Agent没有上报 版本信息!"); + } finally { // 超时,或者 全部信息已经收集 + if (!isAllHealthyAgentReport) { + log.warn("存在部分Agent没有上报 版本信息!"); + } // 此处调用,即可中断 异步任务的收集工作 - REPLAY_CACHE_MAP.remove( - omReplayContend.getReplayMatchKey() - ); + asyncWaitOMResult.stopWaiting(omReplayContend); // 处理结果 omReplayContend @@ -132,7 +125,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // help gc omReplayContend = null; - } return result; @@ -165,21 +157,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { currentTime ); - CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); - ArrayList replayOMList = new ArrayList<>(); - OMReplayContend omReplayContend = OMReplayContend - .builder() - .initTime(currentTime) - .countDownLatch(countDownLatch) - .replayOMList(replayOMList) - .replayMatchKey( - OMReplayContend.generateMatchKey( - CurrentAppOctopusMessageType, - currentTime - ) - ) - .type(CurrentAppOctopusMessageType) - .build(); + // 构造结果 + OMReplayContend omReplayContend = OMReplayContend.build( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), + CurrentAppOctopusMessageType, + currentTime + ); + + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 asyncWaitOMResult.waitFor(omReplayContend); @@ -202,9 +187,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - REPLAY_CACHE_MAP.remove( - omReplayContend.getReplayMatchKey() - ); + asyncWaitOMResult.stopWaiting(omReplayContend); // 处理结果 omReplayContend diff --git a/server/src/main/java/io/wdd/rpc/controller/AgentController.java b/server/src/main/java/io/wdd/rpc/controller/AgentController.java index 4e946b4..8e5f110 100644 --- a/server/src/main/java/io/wdd/rpc/controller/AgentController.java +++ b/server/src/main/java/io/wdd/rpc/controller/AgentController.java @@ -21,14 +21,14 @@ public class AgentController { OctopusAgentService octopusAgentService; @GetMapping("/version") - @ApiOperation("[版本]-所有OctopusAgent") + @ApiOperation("[版本] - 所有OctopusAgent") public R> getAllAgentVersion(){ return R.ok(octopusAgentService.getAllAgentVersion()); } @GetMapping("/coreInfo") - @ApiOperation("[核心信息]-所有OctopusAgent") + @ApiOperation("[核心信息] - 所有OctopusAgent") public R> getAllAgentCoreInfo(){ return R.ok(octopusAgentService.getAllAgentCoreInfo()); diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index dad3719..d907e5c 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -5,7 +5,7 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.wdd.common.beans.response.R; import io.wdd.rpc.execute.result.BuildStreamReader; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -17,26 +17,27 @@ import java.util.List; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @RestController @RequestMapping("/octopus/server/executor") -@Api("Agent执行命令的Controller") +@Api(value = "Agent执行命令的Controller", tags = "Execution") public class ExecutionController { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Resource BuildStreamReader buildStreamReader; @PostMapping("/command/one") - @ApiOperation("[命令]-手动发送命令") + @ApiOperation("[命令] - 手动发送命令") public R patchCommandToAgent( - @RequestParam(value = "topicName") String topicName, + @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, @RequestParam(value = "commandList", required = false) @Nullable List commandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - String streamKey = coreExecutionService + String streamKey = asyncExecutionService .SendCommandToAgent( topicName, type, @@ -47,7 +48,7 @@ public class ExecutionController { } @PostMapping("/command/batch") - @ApiOperation("[命令]- 批量发送命令") + @ApiOperation("[命令] - 批量发送命令") public R> patchCommandToAgentList( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @@ -56,7 +57,7 @@ public class ExecutionController { @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(coreExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgent( topicNameList, type, commandList @@ -65,20 +66,51 @@ public class ExecutionController { @PostMapping("/command/all") - @ApiOperation("[命令]- 发送命令至所有的主机") - public R> patchCommandToAgentAll( + @ApiOperation("[命令] - 发送命令至所有的主机") + public R> patchCommandToAllAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(coreExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgent( ALL_AGENT_TOPIC_NAME_LIST, type, commandList )); } + @PostMapping("/command/healthy") + @ApiOperation("[命令] - 发送命令至健康的主机") + public R> patchCommandToHealthyAgent( + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "type", required = false) @Nullable String type + ) { + + return R.ok(asyncExecutionService.SendCommandToAgent( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, + type, + commandList + )); + } + + @PostMapping("/command/sync/one") + @ApiOperation("[命令] [同步] - 同步等待命令结果") + public R> SyncPatchCommandToAgent( + @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type + ) { + + return R.ok(asyncExecutionService.SendCommandToAgent( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, + type, + commandList + )); + } + @PostMapping("/agentStatusStream") @ApiOperation("切换Console查看Agent状态日志") @@ -105,7 +137,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentUpdate", @@ -121,7 +153,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentReboot", @@ -137,7 +169,7 @@ public class ExecutionController { ) { return R.ok( - coreExecutionService + asyncExecutionService .SendCommandToAgent( topicNameList, "AgentShutdown", @@ -145,5 +177,21 @@ public class ExecutionController { )); } + @PostMapping("/function/bootUp") + @ApiOperation("重新部署") + public R> AgentBootUp( + @RequestParam(value = "topicNameList") + @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList + ) { + + return R.ok( + asyncExecutionService + .SendCommandToAgent( + topicNameList, + "AgentBootUp", + null + )); + } + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java similarity index 79% rename from server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java rename to server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index 100e84b..f068d07 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -1,19 +1,19 @@ package io.wdd.rpc.execute.service; +import io.wdd.common.beans.rabbitmq.OctopusMessage; + import java.util.HashMap; import java.util.List; -public interface CoreExecutionService { +public interface AsyncExecutionService { String SendCommandToAgent(String agentTopicName, String command); String SendCommandToAgent(String agentTopicName, List commandList); - String SendCommandToAgent(String agentTopicName, String type, List commandList); - List SendCommandToAgent(List agentTopicNameList, String type, List command); /** @@ -37,8 +37,9 @@ public interface CoreExecutionService { ); - /** ------------------------------------------------- */ - + /** + * ------------------------------------------------- + */ String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); @@ -89,4 +90,26 @@ public interface CoreExecutionService { ); + /** + * 同步命令调用的方法 + * + * @param agentTopicName + * @param type + * @param commandList + * @param commandListComplete + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + OctopusMessage SyncCallSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java similarity index 91% rename from server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java rename to server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 8e4b928..fb70cf3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.message.sender.OMessageToAgentSender; import lombok.extern.slf4j.Slf4j; @@ -22,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; @Service @Slf4j -public class CoreExecutionServiceImpl implements CoreExecutionService { +public class AsyncExecutionServiceImpl implements AsyncExecutionService { private static final String MANUAL_COMMAND_TYPE = "manual-command"; @Resource @@ -52,11 +53,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList) { - // 归一化type - if (StringUtils.isEmpty(type)) { - type = MANUAL_COMMAND_TYPE; - } - return SendCommandToAgent( agentTopicName, type, @@ -112,6 +108,29 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + String resultKey = futureKey; + // 判定是否是 FutureKey + if (null == futureKey) { + resultKey = ExecutionMessage.GetResultKey(agentTopicName); + } + + // 调用最底层的方法 + this.SyncCallSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + ); + + return resultKey; + } + + @Override + public OctopusMessage SyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error( @@ -122,7 +141,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); } - // 归一化type类型 不行 + // 归一化type + if (StringUtils.isEmpty(type)) { + type = MANUAL_COMMAND_TYPE; + } + String resultKey = futureKey; // 判定是否是 FutureKey if (null == futureKey) { @@ -178,10 +201,8 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { // help gc executionMessage = null; - octopusMessage = null; - - return resultKey; + return octopusMessage; } private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { @@ -191,7 +212,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { return OctopusMessage .builder() .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) + .init_time(TimeUtils.currentFormatTime()) .uuid(agentTopicName) .content( objectMapper.writeValueAsString(executionMessage) diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java new file mode 100644 index 0000000..5bd7e39 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java @@ -0,0 +1,95 @@ +package io.wdd.rpc.execute.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * 同步命令执行的核心类 + * 需要等待命令执行完毕,完后返回相应的结果 + */ +public interface SyncExecutionService { + + /** + * ------------------------ Sync Command Executor ------------------------------ + */ + ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList); + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList); + + List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList); + + /** + * 调用 单行命令脚本的 最底层函数 + * + * @param agentTopicName + * @param type + * @param commandList + * @param needResultReplay + * @param futureKey + * @param durationTask + * @return + */ + ArrayList SyncSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + + + /** + * ------------------------------------------------- + */ + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList); + + + /** + * 通常为 页面定时脚本任务调用 + * + * @param agentTopicNameList 目标Agent的TopicName列表 + * @param type 任务类型 + * @param completeCommandList 完整的类型 + * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey + * @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey + */ + List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey); + + + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey); + + /** + * 调用 完整脚本的 最底层函数 + * + * @param agentTopicName + * @param type + * @param commandList + * @param commandListComplete + * @param futureKey + * @param durationTask + * @return resultKey 本次操作在Redis中记录的结果Key + */ + ArrayList SyncSendCommandToAgent( + String agentTopicName, + String type, + List commandList, + List> commandListComplete, + boolean needResultReplay, + String futureKey, + boolean durationTask + ); + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java new file mode 100644 index 0000000..cfddef8 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -0,0 +1,236 @@ +package io.wdd.rpc.execute.service; + +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.rpc.message.handler.AsyncWaitOMResult; +import io.wdd.rpc.message.handler.OMReplayContend; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class SyncExecutionServiceImpl implements SyncExecutionService { + + private static final boolean COMMAND_EXEC_NEED_REPLAY = true; + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; + @Resource + AsyncWaitOMResult asyncWaitOMResult; + @Resource + AsyncExecutionService asyncExecutionService; + + /** + * 一个命令执行的最长等待时间 + */ + @Value("${octopus.agent.executor.processMaxTimeOut}") + Integer processMaxWaitSeconds; + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList) { + + return this.SyncSendCommandToAgent( + agentTopicName, + null, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList) { + + + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> SyncSendCommandToAgent(List agentTopicNameList, String type, List commandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, boolean needResultReplay, String futureKey, boolean durationTask) { + + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + null, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ); + } + + @Override + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { + + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + + } + + @Override + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList, HashMap atnFutureKey) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + null, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + atnFutureKey.get(agentTopicName), + false + ) + ) + .collect(Collectors.toList()); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { + return this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + COMMAND_EXEC_NEED_REPLAY, + futureKey, + false + ); + } + + @Override + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + + OctopusMessage octopusMessage = asyncExecutionService.SyncCallSendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + needResultReplay, + futureKey, + durationTask + ); + + LocalDateTime initTime = octopusMessage.getInit_time(); + + ArrayList result = new ArrayList<>(); + + // 构造消息等待对象 + int commandCount = Math.max( + commandListComplete.size(), + 1 + ); + OMReplayContend omReplayContend = OMReplayContend.build( + commandCount, + CurrentAppOctopusMessageType, + initTime + ); + CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + + // 开始等待结果 + asyncWaitOMResult.waitFor(omReplayContend); + + // 监听结果 + try { + boolean await = countDownLatch.await( + processMaxWaitSeconds, + TimeUnit.SECONDS + ); + + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + // 等待所有的结果返回 + // 停止等待结果 + asyncWaitOMResult.stopWaiting(omReplayContend); + + + // 解析结果 + omReplayContend + .getReplayOMList() + .stream() + .map( + om -> { + log.debug( + "replay message is => {}", + om + ); + + return (ArrayList) om.getResult(); + } + ) + .forEachOrdered( + singleResult -> result.addAll(singleResult) + ); + + } + + // 返回 + return result; + } +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java index de41ec3..0f29808 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java @@ -12,6 +12,12 @@ import java.util.concurrent.TimeUnit; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +/** + * 从Agent收集返回信息的统一处理地点 + * 使用方法: 业务类构造 OMReplayContend对象,调用AsyncWaitOMResult.waitFor()方法 + *

+ * 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容 + */ @Service @Slf4j public class AsyncWaitOMResult { @@ -21,18 +27,27 @@ public class AsyncWaitOMResult { * KEY -> replayMatchKey * VALUE -> OMReplayContend - 包含countDownLatch 和 result */ - public static final HashMap REPLAY_CACHE_MAP = new HashMap<>(); + private static final HashMap REPLAY_WAITING_TARGET = new HashMap<>(); public void waitFor(OMReplayContend omReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key - REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(), - omReplayContend); + REPLAY_WAITING_TARGET.put( + omReplayContend.getReplayMatchKey(), + omReplayContend + ); // 在调用线程的countDownLunch结束之后,关闭 // 清除 REPLAY_CACHE_MAP 中的队列 } + public void stopWaiting(OMReplayContend omReplayContend) { + + // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 + REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey()); + + } + @PostConstruct public void daemonHandleReplayOMFromAgent() { @@ -71,7 +86,7 @@ public class AsyncWaitOMResult { replayOMessage.getType(), replayOMessage.getInit_time() ); - if (!REPLAY_CACHE_MAP.containsKey(matchKey)) { + if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) { // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // todo 错误的数据需要放置于某处 @@ -80,7 +95,7 @@ public class AsyncWaitOMResult { } // Map中包含有Key,那么放置进去 - OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey); + OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java index 2f024e3..0abd5dc 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java @@ -11,7 +11,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import java.time.LocalDateTime; -import java.util.List; +import java.util.ArrayList; import java.util.concurrent.CountDownLatch; @Data @@ -35,8 +35,7 @@ public class OMReplayContend { CountDownLatch countDownLatch; @ApiModelProperty("回复的结果列表, 临时保存") - List replayOMList; - + ArrayList replayOMList; protected static String generateMatchKey(OMReplayContend replayIdentifier) { @@ -49,6 +48,11 @@ public class OMReplayContend { return relayMatchKey; } + /** + * @param messageType + * @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); + * @return + */ public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { String relayMatchKey = messageType.toString() + messageInitTime.toString(); @@ -56,4 +60,23 @@ public class OMReplayContend { return relayMatchKey; } + /** + * 方便使用的一个构造方法 + * + * @return + */ + public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + + return new OMReplayContend( + currentOMType, + currentTime, + generateMatchKey( + currentOMType, + currentTime + ), + new CountDownLatch(waitForReplayNum), + new ArrayList<>() + ); + } + } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 8ed6c39..3329193 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -1,7 +1,7 @@ package io.wdd.rpc.scheduler.service.script; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import lombok.extern.slf4j.Slf4j; @@ -20,7 +20,7 @@ import java.util.List; public class AgentApplyScheduledScript { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Resource QuartzSchedulerUtils quartzSchedulerUtils; @@ -45,7 +45,7 @@ public class AgentApplyScheduledScript { } // 发送命令到Agent中 - List resultKeyList = coreExecutionService + List resultKeyList = asyncExecutionService .SendCommandToAgentComplete( targetMachineList, scriptType, diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index e78fc65..38cc180 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -1,6 +1,6 @@ package io.wdd.server; -import io.wdd.rpc.execute.service.CoreExecutionService; +import io.wdd.rpc.execute.service.AsyncExecutionService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,7 +13,7 @@ class ServerApplicationTests { @Resource - CoreExecutionService coreExecutionService; + AsyncExecutionService asyncExecutionService; @Test void testCoreExecutionCompleteScript() { @@ -61,7 +61,7 @@ class ServerApplicationTests { ) ); - List resultList = coreExecutionService.SendCommandToAgentComplete( + List resultList = asyncExecutionService.SendCommandToAgentComplete( targetMachineList, "Scheduled Script", completeScript From 65ed1416971acd96eab5c5995d7ea0c331a0ca3d Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 15:55:05 +0800 Subject: [PATCH 06/16] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=90=8C=E6=AD=A5=E8=B0=83=E7=94=A8=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E7=9A=84=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/wdd/rpc/controller/AgentController.java | 2 +- .../wdd/rpc/controller/ExecutionController.java | 15 ++++++++++----- .../wdd/rpc/controller/SchedulerController.java | 2 +- .../io/wdd/rpc/controller/StatusController.java | 2 +- .../service/SyncExecutionServiceImpl.java | 16 +++++++++------- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/io/wdd/rpc/controller/AgentController.java b/server/src/main/java/io/wdd/rpc/controller/AgentController.java index 8e5f110..ed1c9c4 100644 --- a/server/src/main/java/io/wdd/rpc/controller/AgentController.java +++ b/server/src/main/java/io/wdd/rpc/controller/AgentController.java @@ -14,7 +14,7 @@ import java.util.Map; @RestController @RequestMapping("/octopus/server/agent") -@Api("处理Agent核心内容的Controller") +@Api(value = "处理Agent核心内容的Controller", tags = "Agent") public class AgentController { @Resource diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index d907e5c..9a6156f 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -6,6 +6,7 @@ import io.swagger.annotations.ApiParam; import io.wdd.common.beans.response.R; import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.service.AsyncExecutionService; +import io.wdd.rpc.execute.service.SyncExecutionService; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -28,6 +29,8 @@ public class ExecutionController { AsyncExecutionService asyncExecutionService; @Resource BuildStreamReader buildStreamReader; + @Resource + SyncExecutionService syncExecutionService; @PostMapping("/command/one") @ApiOperation("[命令] - 手动发送命令") @@ -104,11 +107,13 @@ public class ExecutionController { @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type ) { - return R.ok(asyncExecutionService.SendCommandToAgent( - ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, - type, - commandList - )); + return R.ok( + syncExecutionService.SyncSendCommandToAgent( + topicName, + type, + commandList + ) + ); } diff --git a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java index e52195a..c06d1d1 100644 --- a/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java +++ b/server/src/main/java/io/wdd/rpc/controller/SchedulerController.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Map; @RestController -@Api(value = "定时任务控制中心的Controller") +@Api(value = "定时任务控制中心的Controller", tags = "Scheduler") @RequestMapping(value = "/octopus/server/scheduler") public class SchedulerController { diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index 068e6f9..bcaf7b4 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -18,7 +18,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.*; @RestController -@Api("Agent运行状态Controller") +@Api(value = "Agent运行状态Controller", tags = "Status") @RequestMapping("/octopus/server/status") public class StatusController { diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index cfddef8..5949c89 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -5,7 +5,6 @@ import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.rpc.message.handler.AsyncWaitOMResult; import io.wdd.rpc.message.handler.OMReplayContend; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -32,8 +31,7 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { /** * 一个命令执行的最长等待时间 */ - @Value("${octopus.agent.executor.processMaxTimeOut}") - Integer processMaxWaitSeconds; + int processMaxWaitSeconds = 10; @Override public ArrayList SyncSendCommandToAgent(String agentTopicName, List commandList) { @@ -179,10 +177,14 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { ArrayList result = new ArrayList<>(); // 构造消息等待对象 - int commandCount = Math.max( - commandListComplete.size(), - 1 - ); + int commandCount = 1; + if (null != commandListComplete) { + commandCount = Math.max( + commandListComplete.size(), + 1 + ); + } + OMReplayContend omReplayContend = OMReplayContend.build( commandCount, CurrentAppOctopusMessageType, From fbcff7ee42b4704b92d981df290675159862d37e Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 16:59:22 +0800 Subject: [PATCH 07/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wdd/agent/executor/CommandExecutor.java | 34 +++++++++++++--- .../executor/thread/CommandExecLogCache.java | 39 ++++++++++++++++++- .../handler/OMessageHandlerServer.java | 2 - 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index b805cb4..a0f530a 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -16,6 +16,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -160,6 +161,30 @@ public class CommandExecutor { process ); + // 获取到命令执行的结果, 即时执行的命令 + final int[] processExitCode = new int[0]; + final Duration[] commandExecDuration = new Duration[1]; + process + .onExit() + .thenAccept( + pro -> { + int exitValue = pro.exitValue(); + + processExitCode[0] = exitValue; + ProcessHandle.Info info = pro.info(); + commandExecDuration[0] = info + .totalCpuDuration() + .get(); + + log.info( + "任务 [ {} ]命令执行完成,执行时间为 [ {} ], 执行命令的结果为 {}", + info.commandLine(), + info.totalCpuDuration(), + exitValue + ); + } + ); + boolean commandExecComplete = false; try { @@ -182,8 +207,6 @@ public class CommandExecutor { throw new RuntimeException(e); } finally { - System.out.println("process isAlive = " + process.isAlive()); - // 任务提前执行结束,或者超过了最长等待时间 // 判断命令是否正确处理完成 if (!commandExecComplete) { @@ -212,9 +235,11 @@ public class CommandExecutor { //commandExecLogCache.PrintCommandCachedLog(streamKey); // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 - commandExecLogCache.StopExecLogBufferedReader( + commandExecLogCache.StopExecLogCollect( streamKey, - process + process, + processExitCode[0], + commandExecDuration[0] ); // 异步执行日志的发送工作 @@ -255,7 +280,6 @@ public class CommandExecutor { } - @Deprecated private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 2499504..fd1aafe 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -18,6 +18,7 @@ import javax.annotation.Resource; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.ExecutorService; @@ -48,7 +49,7 @@ public class CommandExecLogCache { /** * 固定单进程,用于缓存命令执行日志,关闭命令输入管道 */ - private static ExecutorService LogCacheDaemonThread = null; + private static ExecutorService LogCacheDaemonThread; static { @@ -161,6 +162,8 @@ public class CommandExecLogCache { commandCachedLog::add ); + // 对于即时执行完成的任务,需要在这里增加尾巴内容 + log.debug( "命令代码 [ {} ] 的执行日志内容为 {} ", streamKey, @@ -174,7 +177,7 @@ public class CommandExecLogCache { * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 否则部分任务的日志无法收集 */ - public void StopExecLogBufferedReader(String streamKey, Process process) { + public void StopExecLogCollect(String streamKey, Process process, int commandExitValue, Duration duration) { BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); @@ -184,6 +187,19 @@ public class CommandExecLogCache { bufferedReader ); + // 延迟任务,在此时的process还没有关闭,需要获取其中的信息 + if (ObjectUtils.isNotEmpty(process) && process.isAlive()) { + duration = process + .info() + .totalCpuDuration() + .get(); + + commandExitValue = process.exitValue(); + + } + + Duration finalDuration = duration; + int finalCommandExitValue = commandExitValue; LogCacheDaemonThread.submit( () -> { try { @@ -202,6 +218,25 @@ public class CommandExecLogCache { streamKey ); + // 添加任务结束的一些信息 + String execTimeCostString = String.format( + "execution time-cost is => [ %s ]", + finalDuration + ); + + String execResultString = String.format( + "execution result code is => [ %s ]", + finalCommandExitValue + ); + + ArrayList commandExecCachedLog = CachedCommandLogMap.get(streamKey); + + commandExecCachedLog.add("--------------- command result are as above --------------------"); + commandExecCachedLog.add(execTimeCostString); + commandExecCachedLog.add(execResultString); + + + // 从缓存读取其中去掉这个内容 CommandLogBufferedReaderMap.remove(streamKey); } }); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java index 82ca28a..570ff21 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java @@ -14,7 +14,6 @@ import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; import java.io.IOException; import java.util.ArrayDeque; -import java.util.concurrent.ArrayBlockingQueue; @Configuration @Slf4j(topic = "Octopus Message Handler") @@ -60,7 +59,6 @@ public class OMessageHandlerServer { octopusMessage ); - // 获取Agent的版本信息 if (octopusMessage .getUuid() From 00858e9184ebc12f1717668ddb5a356a8e945a54 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 17:13:15 +0800 Subject: [PATCH 08/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wdd/agent/executor/CommandExecutor.java | 4 +- .../executor/thread/CommandExecLogCache.java | 55 ++++++++++++++++++- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index b805cb4..e2540a9 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -182,8 +182,6 @@ public class CommandExecutor { throw new RuntimeException(e); } finally { - System.out.println("process isAlive = " + process.isAlive()); - // 任务提前执行结束,或者超过了最长等待时间 // 判断命令是否正确处理完成 if (!commandExecComplete) { @@ -212,7 +210,7 @@ public class CommandExecutor { //commandExecLogCache.PrintCommandCachedLog(streamKey); // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 - commandExecLogCache.StopExecLogBufferedReader( + commandExecLogCache.StopCollectExecLog( streamKey, process ); diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 2499504..2ca5aee 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -161,6 +161,13 @@ public class CommandExecLogCache { commandCachedLog::add ); + // 对于即时执行完成的任务,需要在这里增加尾巴内容 + addCommandExecTailInfo( + process, + streamKey, + false + ); + log.debug( "命令代码 [ {} ] 的执行日志内容为 {} ", streamKey, @@ -170,11 +177,46 @@ public class CommandExecLogCache { ); } + /** + * 增加尾巴内容 + * + * @param process + * @param streamKey + */ + private void addCommandExecTailInfo(Process process, String streamKey, boolean isKillProcess) { + + log.debug("开始添加任务日志的尾部信息!"); + + // 添加任务结束的一些信息 + String execTimeCostString = String.format( + "execution time-cost is => [ %s ]", + process + .info() + .totalCpuDuration() + ); + + // 是否需要强行关闭 process 杀死任务进程 + if (isKillProcess) { + process.destroyForcibly(); + } + + String execResultString = String.format( + "execution result code is => [ %s ]", + process.exitValue() + ); + + ArrayList commandExecCachedLog = CachedCommandLogMap.get(streamKey); + + commandExecCachedLog.add("--------------- command result are as above --------------------"); + commandExecCachedLog.add(execTimeCostString); + commandExecCachedLog.add(execResultString); + } + /** * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 否则部分任务的日志无法收集 */ - public void StopExecLogBufferedReader(String streamKey, Process process) { + public void StopCollectExecLog(String streamKey, Process process) { BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); @@ -202,6 +244,17 @@ public class CommandExecLogCache { streamKey ); + // 任务卡住了到现在,需要强行中止,并且添加部分日志信息 + if (ObjectUtils.isNotEmpty(process) && process.isAlive()) { + + addCommandExecTailInfo( + process, + streamKey, + true + ); + } + + // 移除 CommandLogBufferedReaderMap.remove(streamKey); } }); From 895e1a09644314abc621a26bba5c025a8ef45468 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 17:24:39 +0800 Subject: [PATCH 09/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/wdd/agent/executor/thread/CommandExecLogCache.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 2ca5aee..0c0a4c8 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -193,12 +193,13 @@ public class CommandExecLogCache { process .info() .totalCpuDuration() + .get() ); // 是否需要强行关闭 process 杀死任务进程 - if (isKillProcess) { + /*if (isKillProcess) { process.destroyForcibly(); - } + }*/ String execResultString = String.format( "execution result code is => [ %s ]", From ac495f17745786a0e6bdc8db34e842d546faf4b7 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 17:27:34 +0800 Subject: [PATCH 10/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/wdd/agent/executor/thread/CommandExecLogCache.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 0c0a4c8..7764542 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -211,6 +211,11 @@ public class CommandExecLogCache { commandExecCachedLog.add("--------------- command result are as above --------------------"); commandExecCachedLog.add(execTimeCostString); commandExecCachedLog.add(execResultString); + + log.debug( + "添加尾部信息完成, 结果为 => {}", + commandExecCachedLog + ); } /** From f88cb39adc2a446f3400bcf65de1d846a25b3677 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 17:30:19 +0800 Subject: [PATCH 11/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/wdd/agent/executor/CommandExecutor.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index e2540a9..7992336 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -215,16 +215,9 @@ public class CommandExecutor { process ); - // 异步执行日志的发送工作 - commandExecLogCache.CollectAndSendExecLog( - streamKey, - needResultReplay, - octopusMessage - ); - // 执行到这里,说明整个任务流程结束(超时结束) log.debug( - "命令 [ {} ] [ {} ]执行全流程结束! 开始释放所有资源", + "命令 [ {} ] [ {} ] 执行全流程结束! 开始释放所有资源", streamKey, process.info() ); @@ -248,6 +241,14 @@ public class CommandExecutor { // shutdown the process process.destroyForcibly(); } + + + // 异步执行日志的发送工作 + commandExecLogCache.CollectAndSendExecLog( + streamKey, + needResultReplay, + octopusMessage + ); } } From 371fa71b50898e7b92b8ef1ce6a2d6727b2d33ab Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 17:39:10 +0800 Subject: [PATCH 12/16] =?UTF-8?q?[server][=20executor]-=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=20-=203?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wdd/agent/executor/CommandExecutor.java | 3 ++- .../executor/thread/CommandExecLogCache.java | 24 +++++++------------ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index 7992336..923c76b 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -212,7 +212,8 @@ public class CommandExecutor { // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 commandExecLogCache.StopCollectExecLog( streamKey, - process + process, + commandExecWaitTimeout ); // 执行到这里,说明整个任务流程结束(超时结束) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 7764542..0606658 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -163,9 +163,9 @@ public class CommandExecLogCache { // 对于即时执行完成的任务,需要在这里增加尾巴内容 addCommandExecTailInfo( - process, + 1, streamKey, - false + 0 ); log.debug( @@ -183,27 +183,19 @@ public class CommandExecLogCache { * @param process * @param streamKey */ - private void addCommandExecTailInfo(Process process, String streamKey, boolean isKillProcess) { + private void addCommandExecTailInfo(int timeOut, String streamKey, int exitValue) { log.debug("开始添加任务日志的尾部信息!"); // 添加任务结束的一些信息 String execTimeCostString = String.format( "execution time-cost is => [ %s ]", - process - .info() - .totalCpuDuration() - .get() + timeOut ); - // 是否需要强行关闭 process 杀死任务进程 - /*if (isKillProcess) { - process.destroyForcibly(); - }*/ - String execResultString = String.format( "execution result code is => [ %s ]", - process.exitValue() + exitValue ); ArrayList commandExecCachedLog = CachedCommandLogMap.get(streamKey); @@ -222,7 +214,7 @@ public class CommandExecLogCache { * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 否则部分任务的日志无法收集 */ - public void StopCollectExecLog(String streamKey, Process process) { + public void StopCollectExecLog(String streamKey, Process process, int commandExecWaitTimeout) { BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); @@ -254,9 +246,9 @@ public class CommandExecLogCache { if (ObjectUtils.isNotEmpty(process) && process.isAlive()) { addCommandExecTailInfo( - process, + commandExecWaitTimeout, streamKey, - true + process.exitValue() ); } From c183b3c474dfcb75aeab6dc2e2c080604f601ee4 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 21:30:00 +0800 Subject: [PATCH 13/16] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=BA=95=E5=B1=82=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/thread/CommandExecLogCache.java | 10 +++- .../rpc/controller/ExecutionController.java | 57 +++++++++++++++++-- .../service/AsyncExecutionService.java | 4 +- .../service/AsyncExecutionServiceImpl.java | 31 ++++++++-- .../execute/service/SyncExecutionService.java | 4 +- .../service/SyncExecutionServiceImpl.java | 22 ++++++- 6 files changed, 114 insertions(+), 14 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index 0606658..dcea5fc 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -117,14 +117,20 @@ public class CommandExecLogCache { ); String execTimeString = String.format( - "execution time is => [ %s ]", + "execution date time is => [ %s ]", TimeUtils.currentTimeString() ); + String execMachineString = String.format( + "execution machine is => [ %s ]", + streamKey.split("-Execution")[0] + ); + // add the command commandCachedLog.add(execCommandString); commandCachedLog.add(execTimeString); + commandCachedLog.add(execMachineString); commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add(""); @@ -248,7 +254,7 @@ public class CommandExecLogCache { addCommandExecTailInfo( commandExecWaitTimeout, streamKey, - process.exitValue() + 233 ); } diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 9a6156f..5409272 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -14,6 +14,7 @@ import org.springframework.web.bind.annotation.RestController; import javax.annotation.Nullable; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.List; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; @@ -88,22 +89,27 @@ public class ExecutionController { public R> patchCommandToHealthyAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(asyncExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgentComplete( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, type, - commandList + commandList, + completeCommandList )); } @PostMapping("/command/sync/one") - @ApiOperation("[命令] [同步] - 同步等待命令结果") + @ApiOperation("[命令] [同步] - 单机-等待命令结果") public R> SyncPatchCommandToAgent( @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type ) { @@ -111,7 +117,50 @@ public class ExecutionController { syncExecutionService.SyncSendCommandToAgent( topicName, type, - commandList + commandList, + completeCommandList + ) + ); + } + + @PostMapping("/command/sync/batch") + @ApiOperation("[命令] [同步] - 批量-等待命令结果") + public R>> SyncPatchCommandToAgentBatch( + @RequestParam(value = "topicNameList") + @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, + @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type + ) { + + return R.ok( + syncExecutionService.SyncSendCommandToAgentComplete( + topicNameList, + type, + commandList, + completeCommandList + ) + ); + } + + @PostMapping("/command/sync/all") + @ApiOperation("[命令] [同步] - 全部-同步等待命令结果") + public R>> SyncPatchCommandToAgentAll( + @RequestParam(value = "commandList", required = false) + @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, + @RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type + ) { + + return R.ok( + syncExecutionService.SyncSendCommandToAgentComplete( + ALL_AGENT_TOPIC_NAME_LIST, + type, + commandList, + completeCommandList ) ); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index f068d07..8795595 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -41,7 +41,9 @@ public interface AsyncExecutionService { * ------------------------------------------------- */ - String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + String SendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete); + + List SendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete); /** * 通常为 页面定时脚本任务调用 diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index fb70cf3..9097269 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -79,17 +79,37 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } @Override - public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + public String SendCommandToAgentComplete(String agentTopicName, String type, List commandList, List> commandListComplete) { return this.SendCommandToAgent( agentTopicName, type, commandList, commandListComplete, - null + false, + null, + false ); } + @Override + public List SendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> commandListComplete) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SendCommandToAgent( + agentTopicName, + type, + commandList, + commandListComplete, + false, + null, + false + ) + ) + .collect(Collectors.toList()); + } + @Override public String SendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, String futureKey) { @@ -265,7 +285,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { return agentTopicNameList .stream() .map( - agentTopicName -> this.SendCommandToAgent( + agentTopicName -> this.SendCommandToAgentComplete( agentTopicName, type, null, @@ -287,7 +307,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { type, null, completeCommandList, - atnFutureKey.get(agentTopicName) + atnFutureKey.getOrDefault( + agentTopicName, + null + ) ) ) .collect(Collectors.toList()); diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java index 5bd7e39..f46d593 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java @@ -44,7 +44,9 @@ public interface SyncExecutionService { * ------------------------------------------------- */ - ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete); + ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> completeCommandList); + + List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList); /** * 通常为 页面定时脚本任务调用 diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index 5949c89..22d6ed9 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -96,18 +96,36 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { } @Override - public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete) { + public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> completeCommandList) { return this.SyncSendCommandToAgent( agentTopicName, type, commandList, - commandListComplete, + completeCommandList, COMMAND_EXEC_NEED_REPLAY, null, false ); } + @Override + public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List commandList, List> completeCommandList) { + return agentTopicNameList + .stream() + .map( + agentTopicName -> this.SyncSendCommandToAgent( + agentTopicName, + type, + commandList, + completeCommandList, + COMMAND_EXEC_NEED_REPLAY, + null, + false + ) + ) + .collect(Collectors.toList()); + } + @Override public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List> completeCommandList) { From a47ede755f43e8d061bd05f16987bae1ee27db5c Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 21:39:33 +0800 Subject: [PATCH 14/16] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=BA=95=E5=B1=82=E7=9A=84=E4=BB=A3=E7=A0=81=20-=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../executor/thread/CommandExecLogCache.java | 2 ++ .../rpc/controller/ExecutionController.java | 30 +++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java index dcea5fc..039a73b 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/CommandExecLogCache.java @@ -206,6 +206,8 @@ public class CommandExecLogCache { ArrayList commandExecCachedLog = CachedCommandLogMap.get(streamKey); + commandExecCachedLog.add(""); + commandExecCachedLog.add(""); commandExecCachedLog.add("--------------- command result are as above --------------------"); commandExecCachedLog.add(execTimeCostString); commandExecCachedLog.add(execResultString); diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index 5409272..c94ffdf 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -34,10 +34,12 @@ public class ExecutionController { SyncExecutionService syncExecutionService; @PostMapping("/command/one") - @ApiOperation("[命令] - 手动发送命令") + @ApiOperation("[命令] [异步]- 单台主机") public R patchCommandToAgent( @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName, @RequestParam(value = "commandList", required = false) @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, @RequestParam(value = "type", required = false) @Nullable String type ) { @@ -45,47 +47,57 @@ public class ExecutionController { .SendCommandToAgent( topicName, type, - commandList + commandList, + completeCommandList, + false, + null, + false ); return R.ok(streamKey); } @PostMapping("/command/batch") - @ApiOperation("[命令] - 批量发送命令") + @ApiOperation("[命令] [异步] - 批量主机") public R> patchCommandToAgentList( @RequestParam(value = "topicNameList") @ApiParam(name = "topicNameList", value = "目标机器列表") List topicNameList, @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(asyncExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgentComplete( topicNameList, type, - commandList + commandList, + completeCommandList )); } @PostMapping("/command/all") - @ApiOperation("[命令] - 发送命令至所有的主机") + @ApiOperation("[命令] [异步] - 所有的主机") public R> patchCommandToAllAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, + @RequestParam(value = "completeCommandList", required = false) + @ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List> completeCommandList, @RequestParam(value = "type", required = false) @Nullable String type ) { - return R.ok(asyncExecutionService.SendCommandToAgent( + return R.ok(asyncExecutionService.SendCommandToAgentComplete( ALL_AGENT_TOPIC_NAME_LIST, type, - commandList + commandList, + completeCommandList )); } @PostMapping("/command/healthy") - @ApiOperation("[命令] - 发送命令至健康的主机") + @ApiOperation("[命令] [异步] - 健康的主机") public R> patchCommandToHealthyAgent( @RequestParam(value = "commandList", required = false) @ApiParam(name = "commandList", value = "命令行") @Nullable List commandList, From 0959aebf67ddf8f05aafb55c6a442affe1a235e0 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Tue, 28 Feb 2023 21:56:31 +0800 Subject: [PATCH 15/16] =?UTF-8?q?[server][=20executor]-=20=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=BA=95=E5=B1=82=E7=9A=84=E4=BB=A3=E7=A0=81=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/wdd/rpc/execute/service/AsyncExecutionService.java | 2 +- .../io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java | 4 ++-- .../io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java index 8795595..5fddc6e 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionService.java @@ -104,7 +104,7 @@ public interface AsyncExecutionService { * @param durationTask * @return */ - OctopusMessage SyncCallSendCommandToAgent( + OctopusMessage AsyncCallSendCommandToAgent( String agentTopicName, String type, List commandList, diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 9097269..f7555a1 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -135,7 +135,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } // 调用最底层的方法 - this.SyncCallSendCommandToAgent( + this.AsyncCallSendCommandToAgent( agentTopicName, type, commandList, @@ -149,7 +149,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } @Override - public OctopusMessage SyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { + public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { // 检查agentTopicName是否存在 if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index 22d6ed9..15d364c 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -180,7 +180,7 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { @Override public ArrayList SyncSendCommandToAgent(String agentTopicName, String type, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - OctopusMessage octopusMessage = asyncExecutionService.SyncCallSendCommandToAgent( + OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( agentTopicName, type, commandList, @@ -220,7 +220,6 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { TimeUnit.SECONDS ); - } catch (InterruptedException e) { throw new RuntimeException(e); } finally { @@ -229,7 +228,6 @@ public class SyncExecutionServiceImpl implements SyncExecutionService { // 停止等待结果 asyncWaitOMResult.stopWaiting(omReplayContend); - // 解析结果 omReplayContend .getReplayOMList() From 295e3ea024cc2d2a434b33c1a2c8b83fb8cb8af4 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Wed, 1 Mar 2023 10:58:33 +0800 Subject: [PATCH 16/16] =?UTF-8?q?[server][=20xray]-=20=E4=BC=98=E5=8C=96Xr?= =?UTF-8?q?ay=E9=85=8D=E7=BD=AE=E9=83=A8=E5=88=86=E7=9A=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .run/Server-dev.run.xml | 4 +- .../wdd/func/controller/XrayController.java | 11 +- .../func/xray/beans/node/ProxyNodeSet.java | 62 ++++++-- .../protocol/inbound/vmess/ClientObject.java | 3 + .../xray/service/XrayConfigPersistor.java | 133 +++++++++++++++--- .../xray/service/XrayCoreServiceImpl.java | 56 ++------ 6 files changed, 190 insertions(+), 79 deletions(-) diff --git a/.run/Server-dev.run.xml b/.run/Server-dev.run.xml index fb98f5e..15e77dc 100644 --- a/.run/Server-dev.run.xml +++ b/.run/Server-dev.run.xml @@ -1,12 +1,14 @@ +

* 写入文件的教程 https://cloud.tencent.com/developer/article/1895274 */ @Slf4j @Service public class XrayConfigPersistor { - private static String XrayResultPath = "xrayResult/"; - - + // 参考 https://github.com/FasterXML/jackson-databind/issues/585 + private static final ObjectWriter objectWriter = new ObjectMapper() + // 忽略掉 null的字段 + .setSerializationInclusion(JsonInclude.Include.NON_NULL) + // 写的文件必须是unix类型的分隔符号 + .writer( + new DefaultPrettyPrinter() + .withObjectIndenter( + new DefaultIndenter() + .withLinefeed("\n") + )); + private static final String XrayResultPath = "xrayResult/"; public static AtomicInteger cleanVersion = new AtomicInteger(0); + /** + * 执行Xray生成配置文件的持久化工作,沈成伟临时文件保存至当前目录中 + * + * @param xrayConfig + * @param currentVersion + * @param proxyNode + * @return + */ + public XrayConfigInfo persist(XrayConfig xrayConfig, int currentVersion, ProxyNode proxyNode) { + try { + // 将生成的xrayConfig直接写为字符串 + String resultContent = objectWriter + .writeValueAsString(xrayConfig); - public File write(String fileName , String content, int currentVersion) { + // 获得到文件名称 + String timeString = TimeUtils.currentFormatTimeString(); + String fileName = buildXrayConfigFileName( + proxyNode, + timeString + ); - System.out.println("currentVersion = " + currentVersion); + // 文件持久化! + File xrayConfigFile = write( + fileName, + resultContent, + currentVersion + ); + + // 文件写入完成,保存文件信息 + XrayConfigInfo xrayConfigInfo = new XrayConfigInfo(); + xrayConfigInfo.setXrayConfigFile(xrayConfigFile); + xrayConfigInfo.setXrayConfigFileName(fileName); + + return xrayConfigInfo; + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private String buildXrayConfigFileName(ProxyNode proxyNode, String timeString) { + + return proxyNode.getNum() + "-" + proxyNode.getAgentName() + "-" + timeString + ".json"; + + } + + private File write(String fileName, String content, int currentVersion) { + + log.debug( + "currentVersion = {}", + currentVersion + ); if (cleanVersion.get() == currentVersion) { // 清除旧的内容 @@ -41,29 +108,51 @@ public class XrayConfigPersistor { // 构造对象开始写入, 生成文件 File resultFile = getResultFile(fileName); - try { - log.debug("开始写入XrayConfig进入文件中,文件名为 => {}",fileName); - FileWriter fileWriter = new FileWriter( + FileWriter fileWriter = null; + BufferedWriter bufferedWriter = null; + + try { + log.debug( + "开始写入XrayConfig进入文件中,文件名为 => {}", + fileName + ); + fileWriter = new FileWriter( resultFile ); - BufferedWriter bufferedWriter = new BufferedWriter( + bufferedWriter = new BufferedWriter( fileWriter ); - log.debug("文件内容为 => {}", content); + log.debug( + "文件内容为 => {}", + content + ); bufferedWriter.write(content); - // must close - bufferedWriter.close(); - fileWriter.close(); - - return resultFile; } catch (IOException e) { - log.error("打开文件失败,写入tmp文件失败! 文件为 => {}", resultFile.getName()); + log.error( + "打开文件失败,写入tmp文件失败! 文件为 => {}", + resultFile.getName() + ); throw new MyRuntimeException(e); + } finally { + + try { + // must close + bufferedWriter.close(); + fileWriter.close(); + + } catch (IOException e) { + log.error( + "关闭文件写入流失败!, 请检查 文件为 => [ {} ], 内容为 => {}", + fileName, + content + ); + throw new MyRuntimeException(e); + } } } @@ -82,11 +171,12 @@ public class XrayConfigPersistor { } /** - * 根据文件名,需要创建一个文件 + * 根据文件名,需要创建一个文件 + * * @param fileName 文件名,如 xxx.json * @return */ - private File getResultFile(String fileName ){ + private File getResultFile(String fileName) { ClassPathResource classPathResource = new ClassPathResource(XrayResultPath); @@ -99,7 +189,10 @@ public class XrayConfigPersistor { ); } catch (IOException e) { - log.error("获取文件失败请检查! fileName is => {}", fileName); + log.error( + "获取文件失败请检查! fileName is => {}", + fileName + ); throw new MyRuntimeException(e); } diff --git a/server/src/main/java/io/wdd/func/xray/service/XrayCoreServiceImpl.java b/server/src/main/java/io/wdd/func/xray/service/XrayCoreServiceImpl.java index e2ecd42..0e15ff0 100644 --- a/server/src/main/java/io/wdd/func/xray/service/XrayCoreServiceImpl.java +++ b/server/src/main/java/io/wdd/func/xray/service/XrayCoreServiceImpl.java @@ -1,9 +1,5 @@ package io.wdd.func.xray.service; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.XrayConfigInfo; import io.wdd.func.xray.beans.xray.RoutingObject; @@ -20,7 +16,6 @@ import org.apache.commons.beanutils.BeanUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.io.File; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; @@ -36,13 +31,9 @@ import static io.wdd.func.xray.service.XrayConfigPersistor.cleanVersion; @Slf4j public class XrayCoreServiceImpl implements XrayCoreService { - @Resource - ObjectMapper objectMapper; - @Resource XrayConfigPersistor xrayConfigPersistor; - @Override public void generateXrayJsonFromNodeList(ArrayList> allNetworkPathList) { @@ -81,9 +72,6 @@ public class XrayCoreServiceImpl implements XrayCoreService { private void generateXrayJsonSinglePath(ArrayList networkPathList) { int pathLength = networkPathList.size(); - // 忽略掉 null的字段 - objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - // 采用 VMESS + websocket的形式形成 链状代理 // 由于 Vlss+XTLS的形式形成 链状结构 String tag = generatePathTag(networkPathList); @@ -97,7 +85,8 @@ public class XrayCoreServiceImpl implements XrayCoreService { .builder() .id(uuid) .level(0) - .alterId(23) + // 为了进一步防止被探测,一个用户可以在主 ID 的基础上,再额外生成多个 ID。这里只需要指定额外的 ID 的数量,推荐值为 0 代表启用 VMessAEAD + .alterId(0) .email(tag + "@octopus.io") .build(); @@ -123,46 +112,19 @@ public class XrayCoreServiceImpl implements XrayCoreService { pos ); - // 持久化 - try { - String resultContent = objectMapper - .writerWithDefaultPrettyPrinter() - .writeValueAsString(xrayConfig); + // 持久化 Xray生成的文件信息 + XrayConfigInfo xrayConfigInfo = xrayConfigPersistor.persist( + xrayConfig, + currentVersion, + proxyNode + ); - // 获得到文件名称 - String timeString = TimeUtils.currentFormatTimeString(); - String fileName = buildXrayConfigFileName( - proxyNode, - timeString - ); - - // 文件持久化! - File xrayConfigFile = xrayConfigPersistor.write( - fileName, - resultContent, - currentVersion - ); - - // 文件写入完成,保存文件信息 - XrayConfigInfo xrayConfigInfo = new XrayConfigInfo(); - xrayConfigInfo.setXrayConfigFile(xrayConfigFile); - xrayConfigInfo.setXrayConfigFileName(fileName); - - proxyNode.setXrayConfigInfo(xrayConfigInfo); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + proxyNode.setXrayConfigInfo(xrayConfigInfo); } - } - private String buildXrayConfigFileName(ProxyNode proxyNode, String timeString) { - return proxyNode.getNum() + "-" + proxyNode.getAgentName() + "-" + timeString + ".json"; - - } private XrayConfig doBuildXrayConfig(boolean isOutBoundFree, String tag, ClientObject clientObject, int port, ArrayList networkPathList, int pos) {