[server][ executor]- 完成同步调用命令的部分代码
This commit is contained in:
@@ -117,7 +117,7 @@ public class TimeUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String
|
||||
* @return UTC+8 [ yyyy-MM-dd-HH-mm-ss ] Time String
|
||||
*/
|
||||
public static String currentTimeStringFullSplit() {
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import io.wdd.func.oss.service.OSSCoreService;
|
||||
import io.wdd.func.oss.service.OssBackendSelect;
|
||||
import io.wdd.func.xray.beans.node.ProxyNode;
|
||||
import io.wdd.func.xray.beans.node.XrayConfigInfo;
|
||||
import io.wdd.rpc.execute.service.CoreExecutionService;
|
||||
import io.wdd.rpc.execute.service.AsyncExecutionService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -112,7 +112,7 @@ public class XrayConfigDistribute {
|
||||
OSSCoreService ossCoreService;
|
||||
|
||||
@Resource
|
||||
CoreExecutionService executionService;
|
||||
AsyncExecutionService executionService;
|
||||
|
||||
public void uploadXrayConfigToOSS(ArrayList<ProxyNode> networkPathList) {
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -31,7 +30,6 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
|
||||
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
||||
import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP;
|
||||
import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION;
|
||||
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||
|
||||
@@ -67,7 +65,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
// 组装信息至集合中
|
||||
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
||||
|
||||
|
||||
// 发送OctopusMessage-Agent
|
||||
buildOMessageAndSendToAllHealthyAgent(
|
||||
AgentOperationType.VERSION,
|
||||
@@ -75,21 +72,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
);
|
||||
|
||||
// 构造 异步结果监听内容
|
||||
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
||||
ArrayList<OctopusMessage> replayOMList = new ArrayList<>();
|
||||
OMReplayContend omReplayContend = OMReplayContend
|
||||
.builder()
|
||||
.initTime(currentTime)
|
||||
.countDownLatch(countDownLatch)
|
||||
.replayOMList(replayOMList)
|
||||
.replayMatchKey(
|
||||
OMReplayContend.generateMatchKey(
|
||||
CurrentAppOctopusMessageType,
|
||||
currentTime
|
||||
)
|
||||
)
|
||||
.type(CurrentAppOctopusMessageType)
|
||||
.build();
|
||||
OMReplayContend omReplayContend = OMReplayContend.build(
|
||||
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
|
||||
CurrentAppOctopusMessageType,
|
||||
currentTime
|
||||
);
|
||||
|
||||
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
|
||||
|
||||
|
||||
// 调用后台接收处理所有的Replay信息
|
||||
asyncWaitOMResult.waitFor(omReplayContend);
|
||||
@@ -101,21 +91,24 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
countDownLatch
|
||||
);*/
|
||||
|
||||
|
||||
boolean isAllHealthyAgentReport = false;
|
||||
try {
|
||||
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
||||
countDownLatch.await(
|
||||
isAllHealthyAgentReport = countDownLatch.await(
|
||||
5,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("存在部分Agent没有上报 版本信息!");
|
||||
|
||||
} finally {
|
||||
// 超时,或者 全部信息已经收集
|
||||
if (!isAllHealthyAgentReport) {
|
||||
log.warn("存在部分Agent没有上报 版本信息!");
|
||||
}
|
||||
|
||||
// 此处调用,即可中断 异步任务的收集工作
|
||||
REPLAY_CACHE_MAP.remove(
|
||||
omReplayContend.getReplayMatchKey()
|
||||
);
|
||||
asyncWaitOMResult.stopWaiting(omReplayContend);
|
||||
|
||||
// 处理结果
|
||||
omReplayContend
|
||||
@@ -132,7 +125,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
|
||||
// help gc
|
||||
omReplayContend = null;
|
||||
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -165,21 +157,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
currentTime
|
||||
);
|
||||
|
||||
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
||||
ArrayList<OctopusMessage> replayOMList = new ArrayList<>();
|
||||
OMReplayContend omReplayContend = OMReplayContend
|
||||
.builder()
|
||||
.initTime(currentTime)
|
||||
.countDownLatch(countDownLatch)
|
||||
.replayOMList(replayOMList)
|
||||
.replayMatchKey(
|
||||
OMReplayContend.generateMatchKey(
|
||||
CurrentAppOctopusMessageType,
|
||||
currentTime
|
||||
)
|
||||
)
|
||||
.type(CurrentAppOctopusMessageType)
|
||||
.build();
|
||||
// 构造结果
|
||||
OMReplayContend omReplayContend = OMReplayContend.build(
|
||||
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
|
||||
CurrentAppOctopusMessageType,
|
||||
currentTime
|
||||
);
|
||||
|
||||
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
|
||||
|
||||
// 调用后台接收处理所有的Replay信息
|
||||
asyncWaitOMResult.waitFor(omReplayContend);
|
||||
@@ -202,9 +187,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
// 超时,或者 全部信息已经收集
|
||||
|
||||
// 此处调用,即可中断 异步任务的收集工作
|
||||
REPLAY_CACHE_MAP.remove(
|
||||
omReplayContend.getReplayMatchKey()
|
||||
);
|
||||
asyncWaitOMResult.stopWaiting(omReplayContend);
|
||||
|
||||
// 处理结果
|
||||
omReplayContend
|
||||
|
||||
@@ -21,14 +21,14 @@ public class AgentController {
|
||||
OctopusAgentService octopusAgentService;
|
||||
|
||||
@GetMapping("/version")
|
||||
@ApiOperation("[版本]-所有OctopusAgent")
|
||||
@ApiOperation("[版本] - 所有OctopusAgent")
|
||||
public R<Map<String, String>> getAllAgentVersion(){
|
||||
|
||||
return R.ok(octopusAgentService.getAllAgentVersion());
|
||||
}
|
||||
|
||||
@GetMapping("/coreInfo")
|
||||
@ApiOperation("[核心信息]-所有OctopusAgent")
|
||||
@ApiOperation("[核心信息] - 所有OctopusAgent")
|
||||
public R<Map<String, ServerInfoVO>> getAllAgentCoreInfo(){
|
||||
|
||||
return R.ok(octopusAgentService.getAllAgentCoreInfo());
|
||||
|
||||
@@ -5,7 +5,7 @@ import io.swagger.annotations.ApiOperation;
|
||||
import io.swagger.annotations.ApiParam;
|
||||
import io.wdd.common.beans.response.R;
|
||||
import io.wdd.rpc.execute.result.BuildStreamReader;
|
||||
import io.wdd.rpc.execute.service.CoreExecutionService;
|
||||
import io.wdd.rpc.execute.service.AsyncExecutionService;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
@@ -17,26 +17,27 @@ import java.util.List;
|
||||
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER;
|
||||
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST;
|
||||
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/octopus/server/executor")
|
||||
@Api("Agent执行命令的Controller")
|
||||
@Api(value = "Agent执行命令的Controller", tags = "Execution")
|
||||
public class ExecutionController {
|
||||
|
||||
@Resource
|
||||
CoreExecutionService coreExecutionService;
|
||||
AsyncExecutionService asyncExecutionService;
|
||||
@Resource
|
||||
BuildStreamReader buildStreamReader;
|
||||
|
||||
@PostMapping("/command/one")
|
||||
@ApiOperation("[命令]-手动发送命令")
|
||||
@ApiOperation("[命令] - 手动发送命令")
|
||||
public R<String> patchCommandToAgent(
|
||||
@RequestParam(value = "topicName") String topicName,
|
||||
@RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName,
|
||||
@RequestParam(value = "commandList", required = false) @Nullable List<String> commandList,
|
||||
@RequestParam(value = "type", required = false) @Nullable String type
|
||||
) {
|
||||
|
||||
String streamKey = coreExecutionService
|
||||
String streamKey = asyncExecutionService
|
||||
.SendCommandToAgent(
|
||||
topicName,
|
||||
type,
|
||||
@@ -47,7 +48,7 @@ public class ExecutionController {
|
||||
}
|
||||
|
||||
@PostMapping("/command/batch")
|
||||
@ApiOperation("[命令]- 批量发送命令")
|
||||
@ApiOperation("[命令] - 批量发送命令")
|
||||
public R<List<String>> patchCommandToAgentList(
|
||||
@RequestParam(value = "topicNameList")
|
||||
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
|
||||
@@ -56,7 +57,7 @@ public class ExecutionController {
|
||||
@RequestParam(value = "type", required = false) @Nullable String type
|
||||
) {
|
||||
|
||||
return R.ok(coreExecutionService.SendCommandToAgent(
|
||||
return R.ok(asyncExecutionService.SendCommandToAgent(
|
||||
topicNameList,
|
||||
type,
|
||||
commandList
|
||||
@@ -65,20 +66,51 @@ public class ExecutionController {
|
||||
|
||||
|
||||
@PostMapping("/command/all")
|
||||
@ApiOperation("[命令]- 发送命令至所有的主机")
|
||||
public R<List<String>> patchCommandToAgentAll(
|
||||
@ApiOperation("[命令] - 发送命令至所有的主机")
|
||||
public R<List<String>> patchCommandToAllAgent(
|
||||
@RequestParam(value = "commandList", required = false)
|
||||
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
|
||||
@RequestParam(value = "type", required = false) @Nullable String type
|
||||
) {
|
||||
|
||||
return R.ok(coreExecutionService.SendCommandToAgent(
|
||||
return R.ok(asyncExecutionService.SendCommandToAgent(
|
||||
ALL_AGENT_TOPIC_NAME_LIST,
|
||||
type,
|
||||
commandList
|
||||
));
|
||||
}
|
||||
|
||||
@PostMapping("/command/healthy")
|
||||
@ApiOperation("[命令] - 发送命令至健康的主机")
|
||||
public R<List<String>> patchCommandToHealthyAgent(
|
||||
@RequestParam(value = "commandList", required = false)
|
||||
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
|
||||
@RequestParam(value = "type", required = false) @Nullable String type
|
||||
) {
|
||||
|
||||
return R.ok(asyncExecutionService.SendCommandToAgent(
|
||||
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
|
||||
type,
|
||||
commandList
|
||||
));
|
||||
}
|
||||
|
||||
@PostMapping("/command/sync/one")
|
||||
@ApiOperation("[命令] [同步] - 同步等待命令结果")
|
||||
public R<List<String>> SyncPatchCommandToAgent(
|
||||
@RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName,
|
||||
@RequestParam(value = "commandList", required = false)
|
||||
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
|
||||
@RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type
|
||||
) {
|
||||
|
||||
return R.ok(asyncExecutionService.SendCommandToAgent(
|
||||
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
|
||||
type,
|
||||
commandList
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("/agentStatusStream")
|
||||
@ApiOperation("切换Console查看Agent状态日志")
|
||||
@@ -105,7 +137,7 @@ public class ExecutionController {
|
||||
) {
|
||||
|
||||
return R.ok(
|
||||
coreExecutionService
|
||||
asyncExecutionService
|
||||
.SendCommandToAgent(
|
||||
topicNameList,
|
||||
"AgentUpdate",
|
||||
@@ -121,7 +153,7 @@ public class ExecutionController {
|
||||
) {
|
||||
|
||||
return R.ok(
|
||||
coreExecutionService
|
||||
asyncExecutionService
|
||||
.SendCommandToAgent(
|
||||
topicNameList,
|
||||
"AgentReboot",
|
||||
@@ -137,7 +169,7 @@ public class ExecutionController {
|
||||
) {
|
||||
|
||||
return R.ok(
|
||||
coreExecutionService
|
||||
asyncExecutionService
|
||||
.SendCommandToAgent(
|
||||
topicNameList,
|
||||
"AgentShutdown",
|
||||
@@ -145,5 +177,21 @@ public class ExecutionController {
|
||||
));
|
||||
}
|
||||
|
||||
@PostMapping("/function/bootUp")
|
||||
@ApiOperation("重新部署")
|
||||
public R<List<String>> AgentBootUp(
|
||||
@RequestParam(value = "topicNameList")
|
||||
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
|
||||
) {
|
||||
|
||||
return R.ok(
|
||||
asyncExecutionService
|
||||
.SendCommandToAgent(
|
||||
topicNameList,
|
||||
"AgentBootUp",
|
||||
null
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
package io.wdd.rpc.execute.service;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public interface CoreExecutionService {
|
||||
public interface AsyncExecutionService {
|
||||
|
||||
String SendCommandToAgent(String agentTopicName, String command);
|
||||
|
||||
String SendCommandToAgent(String agentTopicName, List<String> commandList);
|
||||
|
||||
|
||||
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList);
|
||||
|
||||
|
||||
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
|
||||
|
||||
/**
|
||||
@@ -37,8 +37,9 @@ public interface CoreExecutionService {
|
||||
);
|
||||
|
||||
|
||||
/** ------------------------------------------------- */
|
||||
|
||||
/**
|
||||
* -------------------------------------------------
|
||||
*/
|
||||
|
||||
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
|
||||
|
||||
@@ -89,4 +90,26 @@ public interface CoreExecutionService {
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* 同步命令调用的方法
|
||||
*
|
||||
* @param agentTopicName
|
||||
* @param type
|
||||
* @param commandList
|
||||
* @param commandListComplete
|
||||
* @param needResultReplay
|
||||
* @param futureKey
|
||||
* @param durationTask
|
||||
* @return
|
||||
*/
|
||||
OctopusMessage SyncCallSendCommandToAgent(
|
||||
String agentTopicName,
|
||||
String type,
|
||||
List<String> commandList,
|
||||
List<List<String>> commandListComplete,
|
||||
boolean needResultReplay,
|
||||
String futureKey,
|
||||
boolean durationTask
|
||||
);
|
||||
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.common.beans.executor.ExecutionMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import io.wdd.common.utils.TimeUtils;
|
||||
import io.wdd.rpc.execute.config.ExecutionLog;
|
||||
import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -22,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
public class AsyncExecutionServiceImpl implements AsyncExecutionService {
|
||||
|
||||
private static final String MANUAL_COMMAND_TYPE = "manual-command";
|
||||
@Resource
|
||||
@@ -52,11 +53,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
@Override
|
||||
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
|
||||
|
||||
// 归一化type
|
||||
if (StringUtils.isEmpty(type)) {
|
||||
type = MANUAL_COMMAND_TYPE;
|
||||
}
|
||||
|
||||
return SendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
@@ -112,6 +108,29 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
@Override
|
||||
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
|
||||
|
||||
String resultKey = futureKey;
|
||||
// 判定是否是 FutureKey
|
||||
if (null == futureKey) {
|
||||
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
|
||||
}
|
||||
|
||||
// 调用最底层的方法
|
||||
this.SyncCallSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
commandListComplete,
|
||||
needResultReplay,
|
||||
futureKey,
|
||||
durationTask
|
||||
);
|
||||
|
||||
return resultKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OctopusMessage SyncCallSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
|
||||
|
||||
// 检查agentTopicName是否存在
|
||||
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
|
||||
log.error(
|
||||
@@ -122,7 +141,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
|
||||
}
|
||||
|
||||
// 归一化type类型 不行
|
||||
// 归一化type
|
||||
if (StringUtils.isEmpty(type)) {
|
||||
type = MANUAL_COMMAND_TYPE;
|
||||
}
|
||||
|
||||
String resultKey = futureKey;
|
||||
// 判定是否是 FutureKey
|
||||
if (null == futureKey) {
|
||||
@@ -178,10 +201,8 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
|
||||
// help gc
|
||||
executionMessage = null;
|
||||
octopusMessage = null;
|
||||
|
||||
return resultKey;
|
||||
|
||||
return octopusMessage;
|
||||
}
|
||||
|
||||
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
|
||||
@@ -191,7 +212,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
return OctopusMessage
|
||||
.builder()
|
||||
.type(OctopusMessageType.EXECUTOR)
|
||||
.init_time(LocalDateTime.now())
|
||||
.init_time(TimeUtils.currentFormatTime())
|
||||
.uuid(agentTopicName)
|
||||
.content(
|
||||
objectMapper.writeValueAsString(executionMessage)
|
||||
@@ -0,0 +1,95 @@
|
||||
package io.wdd.rpc.execute.service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 同步命令执行的核心类
|
||||
* 需要等待命令执行完毕,完后返回相应的结果
|
||||
*/
|
||||
public interface SyncExecutionService {
|
||||
|
||||
/**
|
||||
* ------------------------ Sync Command Executor ------------------------------
|
||||
*/
|
||||
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList);
|
||||
|
||||
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
|
||||
|
||||
List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList);
|
||||
|
||||
/**
|
||||
* 调用 单行命令脚本的 最底层函数
|
||||
*
|
||||
* @param agentTopicName
|
||||
* @param type
|
||||
* @param commandList
|
||||
* @param needResultReplay
|
||||
* @param futureKey
|
||||
* @param durationTask
|
||||
* @return
|
||||
*/
|
||||
ArrayList<String> SyncSendCommandToAgent(
|
||||
String agentTopicName,
|
||||
String type,
|
||||
List<String> commandList,
|
||||
boolean needResultReplay,
|
||||
String futureKey,
|
||||
boolean durationTask
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* -------------------------------------------------
|
||||
*/
|
||||
|
||||
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
|
||||
|
||||
/**
|
||||
* 通常为 页面定时脚本任务调用
|
||||
*
|
||||
* @param agentTopicNameList 目标Agent的TopicName列表
|
||||
* @param type 任务类型
|
||||
* @param completeCommandList 完整的类型
|
||||
* @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey
|
||||
*/
|
||||
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
|
||||
|
||||
|
||||
/**
|
||||
* 通常为 页面定时脚本任务调用
|
||||
*
|
||||
* @param agentTopicNameList 目标Agent的TopicName列表
|
||||
* @param type 任务类型
|
||||
* @param completeCommandList 完整的类型
|
||||
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
|
||||
* @return 每个Agent只返回一个 ResultKey(Script脚本的结果全部拼接到一起),全部的resultKey
|
||||
*/
|
||||
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
|
||||
|
||||
|
||||
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
|
||||
|
||||
/**
|
||||
* 调用 完整脚本的 最底层函数
|
||||
*
|
||||
* @param agentTopicName
|
||||
* @param type
|
||||
* @param commandList
|
||||
* @param commandListComplete
|
||||
* @param futureKey
|
||||
* @param durationTask
|
||||
* @return resultKey 本次操作在Redis中记录的结果Key
|
||||
*/
|
||||
ArrayList<String> SyncSendCommandToAgent(
|
||||
String agentTopicName,
|
||||
String type,
|
||||
List<String> commandList,
|
||||
List<List<String>> commandListComplete,
|
||||
boolean needResultReplay,
|
||||
String futureKey,
|
||||
boolean durationTask
|
||||
);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,236 @@
|
||||
package io.wdd.rpc.execute.service;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import io.wdd.rpc.message.handler.AsyncWaitOMResult;
|
||||
import io.wdd.rpc.message.handler.OMReplayContend;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class SyncExecutionServiceImpl implements SyncExecutionService {
|
||||
|
||||
private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
|
||||
|
||||
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
|
||||
@Resource
|
||||
AsyncWaitOMResult asyncWaitOMResult;
|
||||
@Resource
|
||||
AsyncExecutionService asyncExecutionService;
|
||||
|
||||
/**
|
||||
* 一个命令执行的最长等待时间
|
||||
*/
|
||||
@Value("${octopus.agent.executor.processMaxTimeOut}")
|
||||
Integer processMaxWaitSeconds;
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
|
||||
|
||||
return this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
null,
|
||||
commandList,
|
||||
null,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
null,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
|
||||
|
||||
|
||||
return this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
null,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
null,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList) {
|
||||
|
||||
return agentTopicNameList
|
||||
.stream()
|
||||
.map(
|
||||
agentTopicName -> this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
null,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
|
||||
|
||||
return this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
null,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
futureKey,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
|
||||
return this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
commandListComplete,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
null,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
|
||||
|
||||
return agentTopicNameList
|
||||
.stream()
|
||||
.map(
|
||||
agentTopicName -> this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
null,
|
||||
completeCommandList,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
|
||||
return agentTopicNameList
|
||||
.stream()
|
||||
.map(
|
||||
agentTopicName -> this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
null,
|
||||
completeCommandList,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
atnFutureKey.get(agentTopicName),
|
||||
false
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
|
||||
return this.SyncSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
commandListComplete,
|
||||
COMMAND_EXEC_NEED_REPLAY,
|
||||
futureKey,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
|
||||
|
||||
OctopusMessage octopusMessage = asyncExecutionService.SyncCallSendCommandToAgent(
|
||||
agentTopicName,
|
||||
type,
|
||||
commandList,
|
||||
commandListComplete,
|
||||
needResultReplay,
|
||||
futureKey,
|
||||
durationTask
|
||||
);
|
||||
|
||||
LocalDateTime initTime = octopusMessage.getInit_time();
|
||||
|
||||
ArrayList<String> result = new ArrayList<>();
|
||||
|
||||
// 构造消息等待对象
|
||||
int commandCount = Math.max(
|
||||
commandListComplete.size(),
|
||||
1
|
||||
);
|
||||
OMReplayContend omReplayContend = OMReplayContend.build(
|
||||
commandCount,
|
||||
CurrentAppOctopusMessageType,
|
||||
initTime
|
||||
);
|
||||
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
|
||||
|
||||
// 开始等待结果
|
||||
asyncWaitOMResult.waitFor(omReplayContend);
|
||||
|
||||
// 监听结果
|
||||
try {
|
||||
boolean await = countDownLatch.await(
|
||||
processMaxWaitSeconds,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
|
||||
// 等待所有的结果返回
|
||||
// 停止等待结果
|
||||
asyncWaitOMResult.stopWaiting(omReplayContend);
|
||||
|
||||
|
||||
// 解析结果
|
||||
omReplayContend
|
||||
.getReplayOMList()
|
||||
.stream()
|
||||
.map(
|
||||
om -> {
|
||||
log.debug(
|
||||
"replay message is => {}",
|
||||
om
|
||||
);
|
||||
|
||||
return (ArrayList<String>) om.getResult();
|
||||
}
|
||||
)
|
||||
.forEachOrdered(
|
||||
singleResult -> result.addAll(singleResult)
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
// 返回
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,12 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||
|
||||
/**
|
||||
* 从Agent收集返回信息的统一处理地点
|
||||
* 使用方法: 业务类构造 OMReplayContend对象,调用AsyncWaitOMResult.waitFor()方法
|
||||
* <p>
|
||||
* 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class AsyncWaitOMResult {
|
||||
@@ -21,18 +27,27 @@ public class AsyncWaitOMResult {
|
||||
* KEY -> replayMatchKey
|
||||
* VALUE -> OMReplayContend - 包含countDownLatch 和 result
|
||||
*/
|
||||
public static final HashMap<String, OMReplayContend> REPLAY_CACHE_MAP = new HashMap<>();
|
||||
private static final HashMap<String, OMReplayContend> REPLAY_WAITING_TARGET = new HashMap<>();
|
||||
|
||||
public void waitFor(OMReplayContend omReplayContend) {
|
||||
|
||||
// 向 REPLAY_CACHE_MAP中写入 Key
|
||||
REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(),
|
||||
omReplayContend);
|
||||
REPLAY_WAITING_TARGET.put(
|
||||
omReplayContend.getReplayMatchKey(),
|
||||
omReplayContend
|
||||
);
|
||||
|
||||
// 在调用线程的countDownLunch结束之后,关闭
|
||||
// 清除 REPLAY_CACHE_MAP 中的队列
|
||||
}
|
||||
|
||||
public void stopWaiting(OMReplayContend omReplayContend) {
|
||||
|
||||
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
|
||||
REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey());
|
||||
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void daemonHandleReplayOMFromAgent() {
|
||||
|
||||
@@ -71,7 +86,7 @@ public class AsyncWaitOMResult {
|
||||
replayOMessage.getType(),
|
||||
replayOMessage.getInit_time()
|
||||
);
|
||||
if (!REPLAY_CACHE_MAP.containsKey(matchKey)) {
|
||||
if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) {
|
||||
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
|
||||
|
||||
// todo 错误的数据需要放置于某处
|
||||
@@ -80,7 +95,7 @@ public class AsyncWaitOMResult {
|
||||
}
|
||||
|
||||
// Map中包含有Key,那么放置进去
|
||||
OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey);
|
||||
OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey);
|
||||
replayContend
|
||||
.getReplayOMList()
|
||||
.add(replayOMessage);
|
||||
|
||||
@@ -11,7 +11,7 @@ import lombok.NoArgsConstructor;
|
||||
import lombok.experimental.SuperBuilder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
@Data
|
||||
@@ -35,8 +35,7 @@ public class OMReplayContend {
|
||||
CountDownLatch countDownLatch;
|
||||
|
||||
@ApiModelProperty("回复的结果列表, 临时保存")
|
||||
List<OctopusMessage> replayOMList;
|
||||
|
||||
ArrayList<OctopusMessage> replayOMList;
|
||||
|
||||
protected static String generateMatchKey(OMReplayContend replayIdentifier) {
|
||||
|
||||
@@ -49,6 +48,11 @@ public class OMReplayContend {
|
||||
return relayMatchKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param messageType
|
||||
* @param messageInitTime 必须使用 TimeUtils.currentFormatTime();
|
||||
* @return
|
||||
*/
|
||||
public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) {
|
||||
|
||||
String relayMatchKey = messageType.toString() + messageInitTime.toString();
|
||||
@@ -56,4 +60,23 @@ public class OMReplayContend {
|
||||
return relayMatchKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* 方便使用的一个构造方法
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
|
||||
|
||||
return new OMReplayContend(
|
||||
currentOMType,
|
||||
currentTime,
|
||||
generateMatchKey(
|
||||
currentOMType,
|
||||
currentTime
|
||||
),
|
||||
new CountDownLatch(waitForReplayNum),
|
||||
new ArrayList<>()
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package io.wdd.rpc.scheduler.service.script;
|
||||
|
||||
|
||||
import io.wdd.rpc.execute.service.CoreExecutionService;
|
||||
import io.wdd.rpc.execute.service.AsyncExecutionService;
|
||||
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
|
||||
import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -20,7 +20,7 @@ import java.util.List;
|
||||
public class AgentApplyScheduledScript {
|
||||
|
||||
@Resource
|
||||
CoreExecutionService coreExecutionService;
|
||||
AsyncExecutionService asyncExecutionService;
|
||||
|
||||
@Resource
|
||||
QuartzSchedulerUtils quartzSchedulerUtils;
|
||||
@@ -45,7 +45,7 @@ public class AgentApplyScheduledScript {
|
||||
}
|
||||
|
||||
// 发送命令到Agent中
|
||||
List<String> resultKeyList = coreExecutionService
|
||||
List<String> resultKeyList = asyncExecutionService
|
||||
.SendCommandToAgentComplete(
|
||||
targetMachineList,
|
||||
scriptType,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package io.wdd.server;
|
||||
|
||||
import io.wdd.rpc.execute.service.CoreExecutionService;
|
||||
import io.wdd.rpc.execute.service.AsyncExecutionService;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@@ -13,7 +13,7 @@ class ServerApplicationTests {
|
||||
|
||||
|
||||
@Resource
|
||||
CoreExecutionService coreExecutionService;
|
||||
AsyncExecutionService asyncExecutionService;
|
||||
|
||||
@Test
|
||||
void testCoreExecutionCompleteScript() {
|
||||
@@ -61,7 +61,7 @@ class ServerApplicationTests {
|
||||
)
|
||||
);
|
||||
|
||||
List<String> resultList = coreExecutionService.SendCommandToAgentComplete(
|
||||
List<String> resultList = asyncExecutionService.SendCommandToAgentComplete(
|
||||
targetMachineList,
|
||||
"Scheduled Script",
|
||||
completeScript
|
||||
|
||||
Reference in New Issue
Block a user