[Exec] modify sync and async execution function

This commit is contained in:
zeaslity
2023-06-15 10:37:45 +08:00
parent e6c71612aa
commit ec3d5bba1e
20 changed files with 668 additions and 654 deletions

View File

@@ -3,7 +3,7 @@ package io.wdd.func.xray.service;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.func.oss.config.OctopusObjectSummary; import io.wdd.func.oss.config.OctopusObjectSummary;
import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.execute.service.SyncExecutionService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -83,7 +83,7 @@ public class XrayCallAgent {
} }
@Resource @Resource
AsyncExecutionService executionService; SyncExecutionService executionService;
/** /**
* 为代理链的每一个节点 构建Xray配置更新命令然后发送至对应的Agent中 * 为代理链的每一个节点 构建Xray配置更新命令然后发送至对应的Agent中
@@ -131,7 +131,7 @@ public class XrayCallAgent {
); );
// 向Agent发送命令执行更新操作 // 向Agent发送命令执行更新操作
String resultKey = executionService.SendCommandToAgent( String resultKey = executionService.SyncSendCommandToAgent(
proxyNode.getAgentTopicName(), proxyNode.getAgentTopicName(),
updateCommandType, updateCommandType,
null, null,

View File

@@ -26,8 +26,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;

View File

@@ -15,11 +15,12 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@RestController @RestController
@RequestMapping("/octopus/server/executor") @RequestMapping("/octopus/server/executor")
@@ -27,11 +28,11 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAM
public class ExecutionController { public class ExecutionController {
@Resource @Resource
AsyncExecutionService asyncExecutionService; SyncExecutionService syncExecutionService;
@Resource @Resource
BuildStreamReader buildStreamReader; BuildStreamReader buildStreamReader;
@Resource @Resource
SyncExecutionService syncExecutionService; AsyncExecutionService asyncExecutionService;
@PostMapping("/command/one") @PostMapping("/command/one")
@ApiOperation("[命令] [异步]- 单台主机") @ApiOperation("[命令] [异步]- 单台主机")
@@ -44,8 +45,8 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) { ) {
String streamKey = asyncExecutionService ArrayList<String> streamKeyList = asyncExecutionService
.SendCommandToAgent( .AsyncSendCommandToAgentComplete(
topicName, topicName,
type, type,
commandList, commandList,
@@ -55,12 +56,13 @@ public class ExecutionController {
isDurationTask isDurationTask
); );
return R.ok(streamKey);
return R.ok(streamKeyList.toString());
} }
@PostMapping("/command/batch") @PostMapping("/command/batch")
@ApiOperation("[命令] [异步] - 批量主机") @ApiOperation("[命令] [异步] - 批量主机")
public R<List<String>> patchCommandToAgentList( public R<List<ArrayList<String>>> patchCommandToAgentList(
@RequestParam(value = "topicNameList") @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList, @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@@ -71,19 +73,20 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) { ) {
return R.ok(asyncExecutionService.SendCommandToAgentComplete( List<ArrayList<String>> arrayListList = asyncExecutionService.AsyncSendCommandToAgentComplete(
topicNameList, topicNameList,
type, type,
commandList, commandList,
completeCommandList, completeCommandList,
isDurationTask isDurationTask
)); );
return R.ok(arrayListList);
} }
@PostMapping("/command/all") @PostMapping("/command/all")
@ApiOperation("[命令] [异步] - 所有的主机") @ApiOperation("[命令] [异步] - 所有的主机")
public R<List<String>> patchCommandToAllAgent( public R<List<ArrayList<String>>> patchCommandToAllAgent(
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList, @ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false) @RequestParam(value = "completeCommandList", required = false)
@@ -92,7 +95,7 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) { ) {
return R.ok(asyncExecutionService.SendCommandToAgentComplete( return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete(
ALL_AGENT_TOPIC_NAME_LIST, ALL_AGENT_TOPIC_NAME_LIST,
type, type,
commandList, commandList,
@@ -103,7 +106,7 @@ public class ExecutionController {
@PostMapping("/command/healthy") @PostMapping("/command/healthy")
@ApiOperation("[命令] [异步] - 健康的主机") @ApiOperation("[命令] [异步] - 健康的主机")
public R<List<String>> patchCommandToHealthyAgent( public R<List<ArrayList<String>>> patchCommandToHealthyAgent(
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList, @ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false) @RequestParam(value = "completeCommandList", required = false)
@@ -112,7 +115,7 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask @ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) { ) {
return R.ok(asyncExecutionService.SendCommandToAgentComplete( return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
type, type,
commandList, commandList,
@@ -133,18 +136,18 @@ public class ExecutionController {
) { ) {
return R.ok( return R.ok(
syncExecutionService.SyncSendCommandToAgent( Collections.singletonList(syncExecutionService.SyncSendCommandToAgentComplete(
topicName, topicName,
type, type,
commandList, commandList,
completeCommandList completeCommandList
) ))
); );
} }
@PostMapping("/command/sync/batch") @PostMapping("/command/sync/batch")
@ApiOperation("[命令] [同步] - 批量-等待命令结果") @ApiOperation("[命令] [同步] - 批量-等待命令结果")
public R<List<ArrayList<String>>> SyncPatchCommandToAgentBatch( public R<List<String>> SyncPatchCommandToAgentBatch(
@RequestParam(value = "topicNameList") @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList, @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@@ -168,7 +171,7 @@ public class ExecutionController {
@PostMapping("/command/sync/all") @PostMapping("/command/sync/all")
@ApiOperation("[命令] [同步] - 全部-同步等待命令结果") @ApiOperation("[命令] [同步] - 全部-同步等待命令结果")
public R<List<ArrayList<String>>> SyncPatchCommandToAgentAll( public R<List<String>> SyncPatchCommandToAgentAll(
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList, @ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false) @RequestParam(value = "completeCommandList", required = false)
@@ -206,81 +209,81 @@ public class ExecutionController {
// auth required // auth required
@PostMapping("/function/update") // @PostMapping("/function/update")
@ApiOperation("升级") // @ApiOperation("升级")
public R<List<String>> AgentUpdate( // public R<List<String>> AgentUpdate(
@RequestParam(value = "topicNameList") // @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList // @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
) { // ) {
//
return R.ok( // return R.ok(
asyncExecutionService // syncExecutionService
.SendCommandToAgent( // .SyncSendCommandToAgent(
topicNameList, // topicNameList,
"AgentUpdate", // "AgentUpdate",
null, // null,
false, // false,
null, // null,
true // true
)); // ));
} // }
//
@PostMapping("/function/reboot") // @PostMapping("/function/reboot")
@ApiOperation("重启") // @ApiOperation("重启")
public R<List<String>> AgentReboot( // public R<List<String>> AgentReboot(
@RequestParam(value = "topicNameList") // @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList // @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
) { // ) {
//
return R.ok( // return R.ok(
asyncExecutionService // asyncExecutionService
.SendCommandToAgent( // .SyncSendCommandToAgent(
topicNameList, // topicNameList,
"AgentReboot", // "AgentReboot",
null, // null,
false, // false,
null, // null,
true // true
)); // ));
} // }
//
@PostMapping("/function/shutdown") // @PostMapping("/function/shutdown")
@ApiOperation("关闭") // @ApiOperation("关闭")
public R<List<String>> AgentShutdown( // public R<List<String>> AgentShutdown(
@RequestParam(value = "topicNameList") // @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList // @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
) { // ) {
//
return R.ok( // return R.ok(
asyncExecutionService // syncExecutionService
.SendCommandToAgent( // .SyncSendCommandToAgent(
topicNameList, // topicNameList,
"AgentShutdown", // "AgentShutdown",
null, // null,
false, // false,
null, // null,
true // true
)); // ));
} // }
//
@PostMapping("/function/bootUp") // @PostMapping("/function/bootUp")
@ApiOperation("重新部署") // @ApiOperation("重新部署")
public R<List<String>> AgentBootUp( // public R<List<String>> AgentBootUp(
@RequestParam(value = "topicNameList") // @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList // @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
) { // ) {
//
return R.ok( // return R.ok(
asyncExecutionService // asyncExecutionService
.SendCommandToAgent( // .SyncSendCommandToAgent(
topicNameList, // topicNameList,
"AgentBootUp", // "AgentBootUp",
null, // null,
false, // false,
null, // null,
true // true
)); // ));
} // }
} }

View File

@@ -4,7 +4,7 @@ package io.wdd.rpc.controller;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import io.wdd.common.response.R; import io.wdd.common.response.R;
import io.wdd.rpc.init.ServerCacheAgentStatus; import io.wdd.rpc.init.AgentStatusCacheService;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@@ -14,7 +14,7 @@ import javax.annotation.Resource;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static io.wdd.rpc.init.ServerCacheAgentStatus.*; import static io.wdd.rpc.init.AgentStatusCacheService.*;
@RestController @RestController
@@ -23,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.*;
public class StatusController { public class StatusController {
@Resource @Resource
ServerCacheAgentStatus serverCacheAgentStatus; AgentStatusCacheService agentStatusCacheService;
@ApiOperation("[ Agent-状态 ] Map") @ApiOperation("[ Agent-状态 ] Map")
@GetMapping("/agent/status") @GetMapping("/agent/status")
@@ -76,7 +76,7 @@ public class StatusController {
public R<Map<String, List<String>>> ManualUpdateAgentStatus() { public R<Map<String, List<String>>> ManualUpdateAgentStatus() {
// 手动调用更新 // 手动调用更新
serverCacheAgentStatus.updateAgentStatusMapCache(); agentStatusCacheService.updateAgentStatusMapCache();
return R.ok(STATUS_AGENT_LIST_MAP); return R.ok(STATUS_AGENT_LIST_MAP);
} }

View File

@@ -1,20 +1,23 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface AsyncExecutionService { public interface AsyncExecutionService {
String SendCommandToAgent(String agentTopicName, String command); /**
* ------------------------ Sync Command Executor ------------------------------
*/
ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, List<String> commandList);
String SendCommandToAgent(String agentTopicName, List<String> commandList); ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList); List<ArrayList<String>> AsyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList);
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask);
/** /**
* 调用 单行命令脚本的 最底层函数 * 调用 单行命令脚本的 最底层函数
@@ -27,7 +30,7 @@ public interface AsyncExecutionService {
* @param durationTask * @param durationTask
* @return * @return
*/ */
String SendCommandToAgent( ArrayList<String> AsyncSendCommandToAgent(
String agentTopicName, String agentTopicName,
String type, String type,
List<String> commandList, List<String> commandList,
@@ -41,14 +44,21 @@ public interface AsyncExecutionService {
* ------------------------------------------------- * -------------------------------------------------
*/ */
String SendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete); ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList);
List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask);
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask); /**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/** /**
* 通常为 页面定时脚本任务调用 * 通常为 页面定时脚本任务调用
* *
@@ -58,10 +68,10 @@ public interface AsyncExecutionService {
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey * @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/ */
List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey); List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey); ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/** /**
* 调用 完整脚本的 最底层函数 * 调用 完整脚本的 最底层函数
@@ -74,30 +84,7 @@ public interface AsyncExecutionService {
* @param durationTask * @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key * @return resultKey 本次操作在Redis中记录的结果Key
*/ */
String SendCommandToAgent( ArrayList<String> AsyncSendCommandToAgentComplete(
String agentTopicName,
String type,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* 同步命令调用的方法
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
OctopusMessage AsyncCallSendCommandToAgent(
String agentTopicName, String agentTopicName,
String type, String type,
List<String> commandList, List<String> commandList,

View File

@@ -1,108 +1,124 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.message.handler.AsyncWaitOMResult;
import io.wdd.rpc.message.handler.OMReplayContend;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
@Service @Service
@Slf4j @Slf4j
public class AsyncExecutionServiceImpl implements AsyncExecutionService { public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command"; private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource @Resource
OMessageToAgentSender oMessageToAgentSender; AsyncWaitOMResult asyncWaitOMResult;
@Resource @Resource
ObjectMapper objectMapper; SyncExecutionService asyncExecutionService;
@Resource
RedisTemplate redisTemplate; /**
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override @Override
public String SendCommandToAgent(String agentTopicName, String command) { public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SendCommandToAgent(
agentTopicName,
List.of(command)
);
}
@Override return this.AsyncSendCommandToAgentComplete(
public String SendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SendCommandToAgent(
agentTopicName, agentTopicName,
MANUAL_COMMAND_TYPE,
commandList
);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return SendCommandToAgent(
agentTopicName,
type,
commandList,
false,
null, null,
false
);
}
@Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SendCommandToAgent(
agentTopicName,
type,
commandList, commandList,
null, null,
needResultReplay, COMMAND_EXEC_NEED_REPLAY,
futureKey,
durationTask
);
}
@Override
public String SendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
null, null,
false false
); );
} }
@Override @Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask) { public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList) {
return agentTopicNameList return agentTopicNameList
.stream() .stream()
.map( .map(
agentTopicName -> this.SendCommandToAgent( agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
commandListComplete, null,
false, COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
@Override
public ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null, null,
isDurationTask isDurationTask
) )
@@ -111,31 +127,60 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
} }
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) { public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return this.SendCommandToAgent( return agentTopicNameList
agentTopicName, .stream()
type, .map(
commandList, agentTopicName -> this.AsyncSendCommandToAgentComplete(
commandListComplete, agentTopicName,
false, type,
futureKey, null,
false completeCommandList,
); COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
} }
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
atnFutureKey.get(agentTopicName),
false
)
)
.collect(Collectors.toList());
}
String resultKey = futureKey; @Override
// 判定是否是 FutureKey public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
if (null == futureKey) { return this.AsyncSendCommandToAgentComplete(
resultKey = ExecutionMessage.GetResultKey(agentTopicName); agentTopicName,
} type,
commandList,
commandListComplete,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
// 调用最底层的方法 @Override
this.AsyncCallSendCommandToAgent( public ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
@@ -145,225 +190,66 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
durationTask durationTask
); );
return resultKey; LocalDateTime initTime = octopusMessage.getInit_time();
}
@Override ArrayList<String> result = new ArrayList<>();
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在 // 构造消息等待对象
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { int commandCount = 1;
log.error( if (null != commandListComplete) {
"agentTopicName异常! 输入为 => {}", commandCount = Math.max(
agentTopicName commandListComplete.size(),
1
); );
return null;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
} }
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey; OMReplayContend omReplayContend = OMReplayContend.build(
// 判定是否是 FutureKey commandCount,
if (null == futureKey) { CurrentAppOctopusMessageType,
resultKey = ExecutionMessage.GetResultKey(agentTopicName); initTime
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
); );
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
// send the message // 开始等待结果
oMessageToAgentSender.send(octopusMessage); asyncWaitOMResult.waitFor(omReplayContend);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey
);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean
/*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(
octopusMessage,
executionMessage
);*/
// send resultKey to ExecutionResultDaemonHandler
// 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效
/*WAIT_EXECUTION_RESULT_LIST.put(
resultKey,
executionLog
);*/
// help gc
executionMessage = null;
return octopusMessage;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
// 监听结果
try { try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
return OctopusMessage } catch (InterruptedException e) {
.builder() throw new RuntimeException(e);
.type(OctopusMessageType.EXECUTOR) } finally {
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName) // 等待所有的结果返回
.content( // 停止等待结果
objectMapper.writeValueAsString(executionMessage) asyncWaitOMResult.stopWaiting(omReplayContend);
// 解析结果
omReplayContend
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
) )
.build(); .forEachOrdered(
singleResult -> result.addAll(singleResult)
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} }
// 返回
return result;
} }
private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid());
executionLog.setResultKey((String) octopusMessage.getContent());
executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand()));
executionLog.setType(executionMessage.getType());
executionLog.setResultKey(executionMessage.getResultKey());
return executionLog;
}
@Override
public List<String> SendCommandToAgent(List<String> agentagentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentagentTopicNameList
.stream()
.map(
agentTopicName -> this
.SendCommandToAgent
(
agentTopicName,
type,
commandList,
null,
needResultReplay,
futureKey,
durationTask
)
)
.collect(Collectors.toList());
}
/**
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return
*/
@Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList
)
)
.collect(Collectors.toList());
}
@Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
atnFutureKey.getOrDefault(
agentTopicName,
null
)
)
)
.collect(Collectors.toList());
}
@Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {
ExecutionMessage executionMessage = this.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
false,
false
);
String executionMessageString;
try {
executionMessageString = objectMapper.writeValueAsString(executionMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(agentTopicName)
.build();
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
} }

View File

@@ -1,23 +1,20 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import java.util.ArrayList; import io.wdd.rpc.message.OctopusMessage;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface SyncExecutionService { public interface SyncExecutionService {
/** String SyncSendCommandToAgent(String agentTopicName, String command);
* ------------------------ Sync Command Executor ------------------------------
*/
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList);
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList); String SyncSendCommandToAgent(String agentTopicName, List<String> commandList);
List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList); String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
List<String> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask);
/** /**
* 调用 单行命令脚本的 最底层函数 * 调用 单行命令脚本的 最底层函数
@@ -30,7 +27,7 @@ public interface SyncExecutionService {
* @param durationTask * @param durationTask
* @return * @return
*/ */
ArrayList<String> SyncSendCommandToAgent( String SyncSendCommandToAgent(
String agentTopicName, String agentTopicName,
String type, String type,
List<String> commandList, List<String> commandList,
@@ -44,21 +41,14 @@ public interface SyncExecutionService {
* ------------------------------------------------- * -------------------------------------------------
*/ */
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList); String SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask);
/** List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask);
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/** /**
* 通常为 页面定时脚本任务调用 * 通常为 页面定时脚本任务调用
* *
@@ -68,10 +58,10 @@ public interface SyncExecutionService {
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey * @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey * @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/ */
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey); List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey); String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/** /**
* 调用 完整脚本的 最底层函数 * 调用 完整脚本的 最底层函数
@@ -84,7 +74,30 @@ public interface SyncExecutionService {
* @param durationTask * @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key * @return resultKey 本次操作在Redis中记录的结果Key
*/ */
ArrayList<String> SyncSendCommandToAgent( String SyncSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* 同步命令调用的方法
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
OctopusMessage AsyncCallSendCommandToAgent(
String agentTopicName, String agentTopicName,
String type, String type,
List<String> commandList, List<String> commandList,

View File

@@ -1,115 +1,100 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.AsyncWaitOMResult; import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.message.handler.OMReplayContend;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET;
@Service @Service
@Slf4j @Slf4j
public class SyncExecutionServiceImpl implements SyncExecutionService { public class SyncExecutionServiceImpl implements SyncExecutionService {
private static final boolean COMMAND_EXEC_NEED_REPLAY = true; private static final String MANUAL_COMMAND_TYPE = "manual-command";
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource @Resource
AsyncWaitOMResult asyncWaitOMResult; OMessageToAgentSender oMessageToAgentSender;
@Resource @Resource
AsyncExecutionService asyncExecutionService; ObjectMapper objectMapper;
@Resource
/** RedisTemplate redisTemplate;
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override @Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList) { public String SyncSendCommandToAgent(String agentTopicName, String command) {
return this.SyncSendCommandToAgent( return this.SyncSendCommandToAgent(
agentTopicName, agentTopicName,
null, List.of(command)
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
); );
} }
@Override @Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) { public String SyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SyncSendCommandToAgent(
agentTopicName,
MANUAL_COMMAND_TYPE,
commandList
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
false,
null,
false
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SyncSendCommandToAgent( return this.SyncSendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
null, null,
COMMAND_EXEC_NEED_REPLAY, needResultReplay,
null,
false
);
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
futureKey, futureKey,
false durationTask
); );
} }
@Override @Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList) { public String SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SyncSendCommandToAgent( return this.SyncSendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
completeCommandList, commandListComplete,
COMMAND_EXEC_NEED_REPLAY, false,
null, null,
false false
); );
} }
@Override @Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask) { public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask) {
return agentTopicNameList return agentTopicNameList
.stream() .stream()
.map( .map(
@@ -117,8 +102,8 @@ public class SyncExecutionServiceImpl implements SyncExecutionService {
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
completeCommandList, commandListComplete,
COMMAND_EXEC_NEED_REPLAY, false,
null, null,
isDurationTask isDurationTask
) )
@@ -127,60 +112,31 @@ public class SyncExecutionServiceImpl implements SyncExecutionService {
} }
@Override @Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) { public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
atnFutureKey.get(agentTopicName),
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return this.SyncSendCommandToAgent( return this.SyncSendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
commandListComplete, commandListComplete,
COMMAND_EXEC_NEED_REPLAY, false,
futureKey, futureKey,
false false
); );
} }
@Override @Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 调用最底层的方法
this.AsyncCallSendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
@@ -190,65 +146,225 @@ public class SyncExecutionServiceImpl implements SyncExecutionService {
durationTask durationTask
); );
LocalDateTime initTime = octopusMessage.getInit_time(); return resultKey;
ArrayList<String> result = new ArrayList<>();
// 构造消息等待对象
int commandCount = 1;
if (null != commandListComplete) {
commandCount = Math.max(
commandListComplete.size(),
1
);
}
OMReplayContend omReplayContend = OMReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
);
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
// 开始等待结果
asyncWaitOMResult.waitFor(omReplayContend);
// 监听结果
try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOMResult.stopWaiting(omReplayContend);
// 解析结果
omReplayContend
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
)
.forEachOrdered(
singleResult -> result.addAll(singleResult)
);
}
// 返回
return result;
} }
@Override
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error(
"agentTopicName异常! 输入为 => {}",
agentTopicName
);
return null;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
}
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
// send the message
oMessageToAgentSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey
);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean
/*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(
octopusMessage,
executionMessage
);*/
// send resultKey to ExecutionResultDaemonHandler
// 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效
/*WAIT_EXECUTION_RESULT_LIST.put(
resultKey,
executionLog
);*/
// help gc
executionMessage = null;
return octopusMessage;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
try {
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
objectMapper.writeValueAsString(executionMessage)
)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid());
executionLog.setResultKey((String) octopusMessage.getContent());
executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand()));
executionLog.setType(executionMessage.getType());
executionLog.setResultKey(executionMessage.getResultKey());
return executionLog;
}
@Override
public List<String> SyncSendCommandToAgent(List<String> agentagentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentagentTopicNameList
.stream()
.map(
agentTopicName -> this
.SyncSendCommandToAgent
(
agentTopicName,
type,
commandList,
null,
needResultReplay,
futureKey,
durationTask
)
)
.collect(Collectors.toList());
}
/**
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return
*/
@Override
public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList
)
)
.collect(Collectors.toList());
}
@Override
public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
atnFutureKey.getOrDefault(
agentTopicName,
null
)
)
)
.collect(Collectors.toList());
}
@Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {
ExecutionMessage executionMessage = this.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
false,
false
);
String executionMessageString;
try {
executionMessageString = objectMapper.writeValueAsString(executionMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(agentTopicName)
.build();
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
} }

View File

@@ -27,7 +27,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
*/ */
@Service @Service
@Slf4j @Slf4j
public class ServerCacheAgentStatus { public class AgentStatusCacheService {
/** /**
* 存储所有的AgentTopicName的缓存 * 存储所有的AgentTopicName的缓存

View File

@@ -27,12 +27,12 @@ public class AsyncWaitOMResult {
* KEY -> replayMatchKey * KEY -> replayMatchKey
* VALUE -> OMReplayContend - 包含countDownLatch 和 result * VALUE -> OMReplayContend - 包含countDownLatch 和 result
*/ */
private static final HashMap<String, OMReplayContend> REPLAY_WAITING_TARGET = new HashMap<>(); private static final HashMap<String, OMReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
public void waitFor(OMReplayContend omReplayContend) { public void waitFor(OMReplayContend omReplayContend) {
// 向 REPLAY_CACHE_MAP中写入 Key // 向 REPLAY_CACHE_MAP中写入 Key
REPLAY_WAITING_TARGET.put( OM_REPLAY_WAITING_TARGET_MAP.put(
omReplayContend.getReplayMatchKey(), omReplayContend.getReplayMatchKey(),
omReplayContend omReplayContend
); );
@@ -44,7 +44,7 @@ public class AsyncWaitOMResult {
public void stopWaiting(OMReplayContend omReplayContend) { public void stopWaiting(OMReplayContend omReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey()); OM_REPLAY_WAITING_TARGET_MAP.remove(omReplayContend.getReplayMatchKey());
} }
@@ -86,16 +86,20 @@ public class AsyncWaitOMResult {
replayOMessage.getType(), replayOMessage.getType(),
replayOMessage.getInit_time() replayOMessage.getInit_time()
); );
if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) { if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处 // todo 错误的数据需要放置于某处
log.debug(
"等待队列力没有该回复的结果key =>",
matchKey
);
continue; continue;
} }
// Map中包含有Key,那么放置进去 // Map中包含有Key,那么放置进去
OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey); OMReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
replayContend replayContend
.getReplayOMList() .getReplayOMList()
.add(replayOMessage); .add(replayOMessage);

View File

@@ -1,7 +1,7 @@
package io.wdd.rpc.scheduler.job; package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.config.QuartzLogOperator; import io.wdd.rpc.scheduler.config.QuartzLogOperator;
import io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus; import io.wdd.rpc.scheduler.service.status.CheckAgentAliveStatus;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.scheduling.quartz.QuartzJobBean;
@@ -11,7 +11,7 @@ import javax.annotation.Resource;
public class AgentStatusMonitorJob extends QuartzJobBean { public class AgentStatusMonitorJob extends QuartzJobBean {
@Resource @Resource
MonitorAllAgentStatus monitorAllAgentStatus; CheckAgentAliveStatus checkAgentAliveStatus;
@Resource @Resource
QuartzLogOperator quartzLogOperator; QuartzLogOperator quartzLogOperator;
@@ -23,7 +23,7 @@ public class AgentStatusMonitorJob extends QuartzJobBean {
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service // actually execute the monitor service
monitorAllAgentStatus.go(); checkAgentAliveStatus.go();
// log to somewhere // log to somewhere
quartzLogOperator.save(); quartzLogOperator.save();

View File

@@ -1,7 +1,7 @@
package io.wdd.rpc.scheduler.service.script; package io.wdd.rpc.scheduler.service.script;
import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.execute.service.SyncExecutionService;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -20,7 +20,7 @@ import java.util.List;
public class AgentApplyScheduledScript { public class AgentApplyScheduledScript {
@Resource @Resource
AsyncExecutionService asyncExecutionService; SyncExecutionService asyncExecutionService;
@Resource @Resource
QuartzSchedulerUtils quartzSchedulerUtils; QuartzSchedulerUtils quartzSchedulerUtils;
@@ -46,7 +46,7 @@ public class AgentApplyScheduledScript {
// 发送命令到Agent中 // 发送命令到Agent中
List<String> resultKeyList = asyncExecutionService List<String> resultKeyList = asyncExecutionService
.SendCommandToAgentComplete( .SyncSendCommandToAgentComplete(
targetMachineList, targetMachineList,
scriptType, scriptType,
completeCommandList, completeCommandList,

View File

@@ -10,7 +10,7 @@ import javax.annotation.Resource;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
/** /**

View File

@@ -1,7 +1,7 @@
package io.wdd.rpc.scheduler.service.status; package io.wdd.rpc.scheduler.service.status;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.init.ServerCacheAgentStatus; import io.wdd.rpc.init.AgentStatusCacheService;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.status.OctopusStatusMessage; import io.wdd.rpc.status.OctopusStatusMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -13,10 +13,11 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
@@ -37,7 +38,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE
@Service @Service
@Slf4j @Slf4j
@Lazy @Lazy
public class MonitorAllAgentStatus { public class CheckAgentAliveStatus {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource @Resource
@@ -46,7 +47,7 @@ public class MonitorAllAgentStatus {
CollectAgentStatus collectAgentStatus; CollectAgentStatus collectAgentStatus;
@Resource @Resource
ServerCacheAgentStatus serverCacheAgentStatus; AgentStatusCacheService agentStatusCacheService;
@Resource @Resource
BuildStatusScheduleTask buildStatusScheduleTask; BuildStatusScheduleTask buildStatusScheduleTask;
@@ -57,7 +58,7 @@ public class MonitorAllAgentStatus {
try { try {
// 1. 获取所有注册的Agent 手动更新 // 1. 获取所有注册的Agent 手动更新
serverCacheAgentStatus.updateAllAgentTopicNameCache(); agentStatusCacheService.updateAllAgentTopicNameCache();
if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) {
log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !"); log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !");
return; return;
@@ -67,6 +68,10 @@ public class MonitorAllAgentStatus {
checkOrCreateRedisHealthyKey(); checkOrCreateRedisHealthyKey();
// 2.发送状态检查信息 agent需要update相应的HashMap的值 // 2.发送状态检查信息 agent需要update相应的HashMap的值
// 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改
CountDownLatch aliveStatusCDL = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size());
buildAndSendAgentHealthMessage(); buildAndSendAgentHealthMessage();
// 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 等待agent的状态上报 // 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 等待agent的状态上报
@@ -139,7 +144,7 @@ public class MonitorAllAgentStatus {
String currentTimeString = TimeUtils.currentTimeString(); String currentTimeString = TimeUtils.currentTimeString();
// 更新所有的缓存状态 // 更新所有的缓存状态
serverCacheAgentStatus.updateAgentStatusMapCache(); agentStatusCacheService.updateAgentStatusMapCache();
// 执行Metric上报定时任务 // 执行Metric上报定时任务
buildStatusScheduleTask.buildAgentMetricScheduleTask(); buildStatusScheduleTask.buildAgentMetricScheduleTask();

View File

@@ -9,15 +9,15 @@ import lombok.NoArgsConstructor;
public class AgentStatus { public class AgentStatus {
@JsonProperty("CPUStatus") @JsonProperty("CPUStatus")
private CPUInfo cPUStatus; private CPUStatus cpuStatus;
@JsonProperty("MemoryStatus") @JsonProperty("MemoryStatus")
private MemoryInfo memoryStatus; private MemoryStatus memoryStatus;
@JsonProperty("NetworkStatus") @JsonProperty("NetworkStatus")
private NetworkInfo networkStatus; private NetworkStatus networkStatus;
@JsonProperty("DiskStatus") @JsonProperty("DiskStatus")
private DiskInfo diskStatus; private DiskStatus diskStatus;
} }

View File

@@ -12,12 +12,12 @@ import java.util.List;
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
public class CPUInfo { public class CPUStatus {
@JsonProperty("NumCores") @JsonProperty("NumCores")
private Integer numCores; private Integer numCores;
@JsonProperty("CPUInfo") @JsonProperty("CPUStatus")
private List<CPUInfoDTO> cPUInfo; private List<CPUInfoDTO> cPUInfo;
@JsonProperty("CPUPercent") @JsonProperty("CPUPercent")
private Double cPUPercent; private Double cPUPercent;

View File

@@ -8,7 +8,7 @@ import java.util.List;
@NoArgsConstructor @NoArgsConstructor
@Data @Data
public class DiskInfo { public class DiskStatus {
@JsonProperty("Total") @JsonProperty("Total")
private Long total; private Long total;

View File

@@ -6,7 +6,7 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor @NoArgsConstructor
@Data @Data
public class MemoryInfo { public class MemoryStatus {
@JsonProperty("TotalMemory") @JsonProperty("TotalMemory")
private Long totalMemory; private Long totalMemory;

View File

@@ -12,7 +12,7 @@ import java.util.List;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
public class NetworkInfo { public class NetworkStatus {
@JsonProperty("name") @JsonProperty("name")
private String name; private String name;

View File

@@ -1,6 +1,6 @@
package io.wdd.server; package io.wdd.server;
import io.wdd.rpc.execute.service.AsyncExecutionService; import io.wdd.rpc.execute.service.SyncExecutionService;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
@@ -13,7 +13,7 @@ class ServerApplicationTests {
@Resource @Resource
AsyncExecutionService asyncExecutionService; SyncExecutionService asyncExecutionService;
@Test @Test
void testCoreExecutionCompleteScript() { void testCoreExecutionCompleteScript() {
@@ -61,7 +61,7 @@ class ServerApplicationTests {
) )
); );
List<String> resultList = asyncExecutionService.SendCommandToAgentComplete( List<String> resultList = asyncExecutionService.SyncSendCommandToAgentComplete(
targetMachineList, targetMachineList,
"Scheduled Script", "Scheduled Script",
completeScript completeScript