[ Service ] [ Executor ] 初步重构Executor部分的代码

This commit is contained in:
zeaslity
2023-08-10 17:09:51 +08:00
parent e5af31bb38
commit fbd8348e15
43 changed files with 1020 additions and 1650 deletions

View File

@@ -5,8 +5,14 @@ import (
"time"
)
type OctopusFunc interface {
Exec(baseFuncName string, funcArgs ...string) []string
Deploy(appFuncName string, funcArgs ...string) (bool, []string)
}
type AppFunc interface {
Deploy(appFuncName string, funcArgs ...string) []string
Deploy(appFuncName string, funcArgs ...string) (bool, []string)
}
var AppExecuteErrorLogPrefix = []string{"App指令执行错误 => "}

View File

@@ -13,7 +13,7 @@ type ExecutionMessage struct {
NeedResultReplay bool `json:"needResultReplay"`
DurationTask bool `json:"durationTask,default:false"`
Type string `json:"type"`
BaseFuncContent []string `json:"baseFuncContent"`
FuncContent []string `json:"funcContent"`
SingleLineCommand []string `json:"singleLineCommand"`
MultiLineCommand [][]string `json:"multiLineCommand"`
PipeLineCommand [][]string `json:"pipeLineCommand"`
@@ -32,9 +32,18 @@ func Execute(em *ExecutionMessage) ([]string, error) {
if strings.HasPrefix(em.Type, "BASE") {
// base function
resultLog = AgentOsOperatorCache.Exec(em.BaseFuncContent[0], em.BaseFuncContent[1:]...)
resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...)
err = nil
} else if strings.HasPrefix(em.Type, "APP") {
// app function
ok, resultLog := AgentOsOperatorCache.Deploy(em.FuncContent[0], em.FuncContent[1:]...)
if ok {
return resultLog, nil
} else {
return resultLog, nil
}
} else {
// shell command

View File

@@ -0,0 +1,15 @@
package io.wdd.func.script.service;
import java.util.List;
public interface FuncService {
List<String> callBaseFuncService(String agentTopicName, String baseFunctionName, List<String> funcArgs);
List<String> callAppFuncService(String agentTopicName, String appFunctionName, List<String> funcArgs);
}

View File

@@ -0,0 +1,67 @@
package io.wdd.func.script.service;
import io.wdd.rpc.execute.ExecutionMessageType;
import io.wdd.rpc.execute.ExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class FuncServiceImpl implements FuncService {
@Resource
ExecutionService executionService;
@Override
public List<String> callBaseFuncService(String agentTopicName, String baseFunctionName, List<String> funcArgs) {
return this.syncCallFunction(
agentTopicName,
ExecutionMessageType.BASE,
baseFunctionName,
funcArgs
);
}
@Override
public List<String> callAppFuncService(String agentTopicName, String appFunctionName, List<String> funcArgs) {
return this.syncCallFunction(
agentTopicName,
ExecutionMessageType.APP,
appFunctionName,
funcArgs
);
}
private List<String> syncCallFunction(String agentTopicName, ExecutionMessageType emType, String funcName, List<String> funcArgs) {
// 重新构造内容
funcArgs.add(
0,
funcName
);
// 调用
ArrayList<String> resultLog = executionService.SendCommandToAgent(
agentTopicName,
emType.toString(),
funcArgs,
null,
null,
true,
"",
false
);
return resultLog;
}
}

View File

@@ -3,7 +3,7 @@ package io.wdd.func.xray.service;
import io.wdd.common.utils.TimeUtils;
import io.wdd.func.oss.config.OctopusObjectSummary;
import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.rpc.execute.service.SyncExecutionService;
import io.wdd.rpc.execute.ExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -83,7 +83,7 @@ public class XrayCallAgent {
}
@Resource
SyncExecutionService executionService;
ExecutionService executionService;
/**
* 为代理链的每一个节点 构建Xray配置更新命令然后发送至对应的Agent中
@@ -131,17 +131,18 @@ public class XrayCallAgent {
);
// 向Agent发送命令执行更新操作
String resultKey = executionService.SyncSendCommandToAgent(
executionService.SendCommandToAgent(
proxyNode.getAgentTopicName(),
updateCommandType,
null,
null,
updateXrayCommandList,
false,
null,
false
);
return resultKey;
return "";
}
)
.collect(Collectors.toList());

View File

@@ -8,15 +8,16 @@ public interface OctopusAgentService {
/**
* 获取所有Agent的版本信息,附带最新的版本信息
* 超时时间为 5s
* 获取所有Agent的版本信息,附带最新的版本信息
* 超时时间为 5s
*
* @return key - AgentTopicName value - version Info
*/
Map<String, String> getAllAgentVersion();
/**
* 获取所有Agent的核心信息,方便更新系统信息
* 获取所有Agent的核心信息,方便更新系统信息
*
* @return agent-topic-name value -agentServerInfo
*/
@@ -24,9 +25,9 @@ public interface OctopusAgentService {
/**
* 执行 Agent 关键操作的接口
* 关机
* */
* 执行 Agent 关键操作的接口
* 关机
*/
String shutdownAgentDanger(String agentTopicName);
}

View File

@@ -7,13 +7,12 @@ import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@@ -26,12 +25,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.LATEST_VERSION;
import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.LATEST_VERSION;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@Service
//@Service
@Slf4j
public class OctopusAgentServiceImpl implements OctopusAgentService {
@@ -70,7 +69,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
);
// 构造 异步结果监听内容
OctopusMessageSynScReplayContend agentReplayContend = OctopusMessageSynScReplayContend.build(
OctopusMessageSyncReplayContend agentReplayContend = OctopusMessageSyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType,
currentTime
@@ -147,16 +146,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
);
// 构造结果
OctopusMessageSynScReplayContend octopusMessageSynScReplayContend = OctopusMessageSynScReplayContend.build(
OctopusMessageSyncReplayContend octopusMessageSyncReplayContend = OctopusMessageSyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType,
currentTime
);
CountDownLatch countDownLatch = octopusMessageSynScReplayContend.getCountDownLatch();
CountDownLatch countDownLatch = octopusMessageSyncReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息
asyncWaitOctopusMessageResultService.waitFor(octopusMessageSynScReplayContend);
asyncWaitOctopusMessageResultService.waitFor(octopusMessageSyncReplayContend);
/* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
result,
@@ -176,10 +175,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 超时,或者 全部信息已经收集
// 此处调用,即可中断 异步任务的收集工作
asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSynScReplayContend);
asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSyncReplayContend);
// 处理结果
octopusMessageSynScReplayContend
octopusMessageSyncReplayContend
.getReplayOMList()
.stream()
.forEach(
@@ -207,7 +206,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
);
// help gc
octopusMessageSynScReplayContend = null;
octopusMessageSyncReplayContend = null;
}
return result;
@@ -265,7 +264,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 不是当前应用需要的的OM将信息放置与Cache队列的末尾
OCTOPUS_MESSAGE_FROM_AGENT.offer(message);
OCTOPUS_MESSAGE_FROM_AGENT.add(message);
// 返回,继续死循环
continue;
@@ -362,7 +361,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// OctopusMessageType判断
boolean OMTypeEqual = message
.getType()
.getOctopusMessageType()
.equals(CurrentAppOctopusMessageType);
return startTimeEqual && OMTypeEqual;
@@ -411,7 +410,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
return OctopusMessage
.builder()
.type(CurrentAppOctopusMessageType)
.octopusMessageType(CurrentAppOctopusMessageType)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.execute.config;
package io.wdd.rpc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@@ -6,14 +6,12 @@ import io.wdd.common.response.R;
import io.wdd.rpc.agent.OctopusAgentService;
import io.wdd.server.beans.vo.ServerInfoVO;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Map;
@RestController
@RequestMapping("/octopus/server/agent")
//@RestController
//@RequestMapping("/octopus/server/agent")
@Api(value = "处理Agent核心内容的Controller", tags = "Agent")
public class AgentController {
@@ -22,7 +20,7 @@ public class AgentController {
@GetMapping("/version")
@ApiOperation("[版本] - 所有OctopusAgent")
public R<Map<String, String>> getAllAgentVersion(){
public R<Map<String, String>> getAllAgentVersion() {
return R.ok(octopusAgentService.getAllAgentVersion());
}

View File

@@ -4,8 +4,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.wdd.common.response.R;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.execute.service.SyncExecutionService;
import io.wdd.rpc.execute.ExecutionService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -14,25 +13,19 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Nullable;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@RestController
@RequestMapping("/octopus/server/executor")
@Api(value = "Agent执行命令的Controller", tags = "Execution")
public class ExecutionController {
@Resource
SyncExecutionService syncExecutionService;
@Resource
AsyncExecutionService asyncExecutionService;
ExecutionService executionService;
@PostMapping("/command/one")
@ApiOperation("[命令] [异步]- 单台主机")
public R<String> patchCommandToAgent(
public R<ArrayList<String>> patchCommandToAgent(
@RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName,
@RequestParam(value = "commandList", required = false) @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@@ -41,19 +34,7 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
ArrayList<String> streamKeyList = asyncExecutionService
.AsyncSendCommandToAgentComplete(
topicName,
type,
commandList,
completeCommandList,
false,
null,
isDurationTask
);
return R.ok(streamKeyList.toString());
return R.ok(null);
}
@PostMapping("/command/batch")
@@ -68,15 +49,7 @@ public class ExecutionController {
@RequestParam(value = "type", required = false) @Nullable String type,
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
List<ArrayList<String>> arrayListList = asyncExecutionService.AsyncSendCommandToAgentComplete(
topicNameList,
type,
commandList,
completeCommandList,
isDurationTask
);
return R.ok(arrayListList);
return R.ok(null);
}
@@ -91,13 +64,7 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete(
ALL_AGENT_TOPIC_NAME_LIST,
type,
commandList,
completeCommandList,
isDurationTask
));
return R.ok(null);
}
@PostMapping("/command/healthy")
@@ -111,15 +78,26 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
return R.ok(asyncExecutionService.AsyncSendCommandToAgentComplete(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
type,
commandList,
completeCommandList,
isDurationTask
));
// List<ArrayList<String>> pathResult = syncExecutionService
// .SyncSendCommandToAgentComplete(
// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
// type,
// null,
// commandList,
// completeCommandList,
// false,
// null,
// isDurationTask
// );
//
//
// return R.ok(pathResult);
return R.ok(null);
}
@PostMapping("/command/sync/one")
@ApiOperation("[命令] [同步] - 单机-等待命令结果")
public R<List<String>> SyncPatchCommandToAgent(
@@ -131,19 +109,25 @@ public class ExecutionController {
@RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type
) {
return R.ok(
Collections.singletonList(syncExecutionService.SyncSendCommandToAgentComplete(
ArrayList<String> resultLog = executionService
.SendCommandToAgent(
topicName,
type,
null,
commandList,
completeCommandList
))
);
completeCommandList,
false,
null,
false
);
return R.ok(resultLog);
}
@PostMapping("/command/sync/batch")
@ApiOperation("[命令] [同步] - 批量-等待命令结果")
public R<List<String>> SyncPatchCommandToAgentBatch(
public R<List<ArrayList<String>>> SyncPatchCommandToAgentBatch(
@RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
@RequestParam(value = "commandList", required = false)
@@ -154,20 +138,28 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
return R.ok(
syncExecutionService.SyncSendCommandToAgentComplete(
topicNameList,
type,
commandList,
completeCommandList,
isDurationTask
)
);
// List<ArrayList<String>> pathResult = syncExecutionService
// .SyncSendCommandToAgentComplete(
// topicNameList,
// type,
// null,
// commandList,
// completeCommandList,
// false,
// null,
// isDurationTask
// );
//
//
// return R.ok(pathResult);
return R.ok(null);
}
@PostMapping("/command/sync/all")
@ApiOperation("[命令] [同步] - 全部-同步等待命令结果")
public R<List<String>> SyncPatchCommandToAgentAll(
public R<List<ArrayList<String>>> SyncPatchCommandToAgentAll(
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@@ -176,20 +168,59 @@ public class ExecutionController {
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
return R.ok(
syncExecutionService.SyncSendCommandToAgentComplete(
ALL_AGENT_TOPIC_NAME_LIST,
type,
commandList,
completeCommandList,
isDurationTask
)
);
// List<ArrayList<String>> pathResult = syncExecutionService
// .SyncSendCommandToAgentComplete(
// ALL_AGENT_TOPIC_NAME_LIST,
// type,
// null,
// commandList,
// completeCommandList,
// false,
// null,
// isDurationTask
// );
//
//
// return R.ok(pathResult);
return R.ok(null);
}
@PostMapping("/command/sync/healthy")
@ApiOperation("[命令] [同步] - 健康的主机")
public R<List<ArrayList<String>>> SyncPatchCommandToHealthyAgent(
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @Nullable String type,
@ApiParam(name = "isDurationTask", value = "是否是持久化任务") @RequestParam(value = "isDurationTask", defaultValue = "false", required = false) boolean isDurationTask
) {
// List<ArrayList<String>> pathResult = syncExecutionService
// .SyncSendCommandToAgentComplete(
// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
// type,
// null,
// commandList,
// completeCommandList,
// false,
// null,
// isDurationTask
// );
//
//
// return R.ok(pathResult);
return R.ok(null);
}
@PostMapping("/agentStatusStream")
@ApiOperation("切换Console查看Agent状态日志")
@Deprecated
public R<String> getAgentStatusStrem(
@RequestParam(value = "streamKey") @ApiParam(value = "status的Stream Key") String streamKey
) {
@@ -199,82 +230,4 @@ public class ExecutionController {
}
// auth required
// @PostMapping("/function/update")
// @ApiOperation("升级")
// public R<List<String>> AgentUpdate(
// @RequestParam(value = "topicNameList")
// @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
// ) {
//
// return R.ok(
// syncExecutionService
// .SyncSendCommandToAgent(
// topicNameList,
// "AgentUpdate",
// null,
// false,
// null,
// true
// ));
// }
//
// @PostMapping("/function/reboot")
// @ApiOperation("重启")
// public R<List<String>> AgentReboot(
// @RequestParam(value = "topicNameList")
// @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
// ) {
//
// return R.ok(
// asyncExecutionService
// .SyncSendCommandToAgent(
// topicNameList,
// "AgentReboot",
// null,
// false,
// null,
// true
// ));
// }
//
// @PostMapping("/function/shutdown")
// @ApiOperation("关闭")
// public R<List<String>> AgentShutdown(
// @RequestParam(value = "topicNameList")
// @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
// ) {
//
// return R.ok(
// syncExecutionService
// .SyncSendCommandToAgent(
// topicNameList,
// "AgentShutdown",
// null,
// false,
// null,
// true
// ));
// }
//
// @PostMapping("/function/bootUp")
// @ApiOperation("重新部署")
// public R<List<String>> AgentBootUp(
// @RequestParam(value = "topicNameList")
// @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
// ) {
//
// return R.ok(
// asyncExecutionService
// .SyncSendCommandToAgent(
// topicNameList,
// "AgentBootUp",
// null,
// false,
// null,
// true
// ));
// }
}

View File

@@ -8,7 +8,9 @@ import io.wdd.rpc.beans.request.MetricQueryEntity;
import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService;
import io.wdd.rpc.status.beans.AgentStatus;
import io.wdd.rpc.status.service.SyncStatusService;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import javax.annotation.Resource;
import java.util.ArrayList;
@@ -18,9 +20,9 @@ import java.util.Map;
import static io.wdd.rpc.status.CommonAndStatusCache.*;
@RestController
//@RestController
@Api(value = "Agent运行状态Controller", tags = "Status")
@RequestMapping("/octopus/server/status")
//@RequestMapping("/octopus/server/status")
public class StatusController {
@Resource

View File

@@ -33,10 +33,15 @@ public class ExecutionMessage {
/**
* 用于区分 ExecutionMessage的类型
* 直接执行预定函数,则为 Nacos配置中的 方法名称,例如 AgentUpdate AgentReboot
* BASE APP
*/
private String type;
/**
* 执行功能脚本时需要的参数
*/
private List<String> funcContent;
/**
* 只有一行的命令行
*/

View File

@@ -0,0 +1,13 @@
package io.wdd.rpc.execute;
public enum ExecutionMessageType {
// 基础类型,执行基础脚本类
BASE,
// 应用类,执行特定功能
APP,
}

View File

@@ -0,0 +1,21 @@
package io.wdd.rpc.execute;
import java.util.ArrayList;
import java.util.List;
public interface ExecutionService {
ArrayList<String> SendCommandToAgent(
String agentTopicName,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String resultKey,
boolean durationTask
);
}

View File

@@ -0,0 +1,125 @@
package io.wdd.rpc.execute;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
@Service
@Slf4j
public class ExecutionServiceImpl implements ExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command";
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Override
public ArrayList<String> SendCommandToAgent(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String resultKey, boolean durationTask) {
ArrayList<String> commandResultLog = null;
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
// send the message
oMessageToAgentSender.send(octopusMessage);
// 需要返回结果
if (!durationTask) {
synchronized (octopusMessage) {
try {
octopusMessage.wait(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 转换结果
commandResultLog = (ArrayList<String>) octopusMessage.getResult();
// debug
log.debug(
"执行命令 {} 的结果为 {} 内容为 {}",
executionMessage.getSingleLineCommand() == null ? executionMessage.getMultiLineCommand() : executionMessage.getSingleLineCommand(),
octopusMessage.getResultCode(),
octopusMessage.getResult()
);
}
return commandResultLog;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
return OctopusMessage
.builder()
.octopusMessageType(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
executionMessage
)
.build();
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
private boolean validateCommandInfo(String agentTopicName, String type) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error(
"agentTopicName异常! 输入为 => {}",
agentTopicName
);
return false;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
}
return true;
}
}

View File

@@ -1,38 +0,0 @@
package io.wdd.rpc.execute.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.ArrayList;
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder(toBuilder = true)
public class CommandReaderConfig {
/**
* 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
private String streamKey;
private String recordId;
/**
* 执行的结果对象,保存在此处
*/
private ArrayList<String> ExecutionResult;
}

View File

@@ -1,26 +0,0 @@
//package io.wdd.rpc.execute.config;
//
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//
//@Configuration
//public class CommandReaderConfigBean {
//
// // todo must support for multi thread
// // its not thread safe now
// @Bean
// public CommandReaderConfig commandReaderConfig() {
//
// return CommandReaderConfig
// .builder()
// .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME)
// .streamKey("ccc")
// .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME)
// .group("ccc")
// .ExecutionResult(null)
// .build();
// }
//
//
//}

View File

@@ -1,16 +0,0 @@
package io.wdd.rpc.execute.config;
import io.swagger.annotations.ApiModel;
import io.wdd.server.beans.po.ExecutionLogPO;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@ApiModel("Execution模快持久化Bean对象")
public class ExecutionLog extends ExecutionLogPO {
}

View File

@@ -1,33 +0,0 @@
package io.wdd.rpc.execute.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
public class ExecutionResultStringDeserializer {
public static ArrayList<String> format(String executionResultString) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
true);
try {
String tmp = objectMapper.readValue(executionResultString,
new TypeReference<String>() {
});
return objectMapper.readValue(tmp,
new TypeReference<ArrayList<String>>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,189 +0,0 @@
//package io.wdd.rpc.execute.result;
//
//import io.wdd.rpc.execute.config.CommandReaderConfig;
//import io.wdd.server.utils.SpringUtils;
//import lombok.SneakyThrows;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.stereotype.Component;
//
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.concurrent.TimeUnit;
//
//import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
//
//
//@Component
//@Slf4j
//public class BuildStreamReader {
//
// private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
// private RedisStreamReaderConfig redisStreamReaderConfig;
//
// private StreamMessageListenerContainer streamMessageListenerContainer;
//
// private CommandReaderConfig commandReaderConfig;
//
// public void buildStreamReader(CommandReaderConfig commandReaderConfig) {
//
// // prepare the environment
// prepareExecutionEnv();
//
//
// // just modify the redis listener container and it's ok
// modifyExecutionStreamReader(commandReaderConfig);
//
// }
//
// @SneakyThrows
// private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) {
//
// // stop the old stream listener container
// if (this.streamMessageListenerContainer.isRunning()) {
// this.streamMessageListenerContainer.stop();
// }
//
// // modify container
// this.streamMessageListenerContainer.receive(
// StreamOffset.create(
// commandReaderConfig.getStreamKey(),
// ReadOffset.lastConsumed()),
//
// new CommandResultReader(
// commandReaderConfig
// )
// );
//
//
// // very important
// TimeUnit.MILLISECONDS.sleep(500);
// this.streamMessageListenerContainer.start();
// }
//
// private void prepareExecutionEnv() {
//
// getRedisStreamListenerContainer();
//
// getRedisStreamReaderConfig();
//
// }
//
// private void getRedisStreamReaderConfig() {
//
// this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig",
// CommandReaderConfig.class);
// }
//
// private void getRedisStreamListenerContainer() {
//
// this.streamMessageListenerContainer = SpringUtils.getBean(
// EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
// StreamMessageListenerContainer.class
// );
// }
//
// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
// registerStreamReader(redisStreamListenerContainerBeanName,
// streamKey,
// null);
// }
//
// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList<String> ExecutionResult) {
//
// // prepare the environment
// prepareEnv();
//
// // oldStreamKey equals streamKey don't need to do anything , just return
// if (redisStreamReaderConfig.getStreamKey()
// .equals(streamKey)) {
// log.debug("redis listener container not change !");
// return;
// }
//
// // destroy the old REDIS_STREAM_LISTENER_CONTAINER
// destroyStreamReader(streamKey);
//
// // modify the configuration ==> streamKey
// modifyStreamReader(streamKey,
// ExecutionResult);
//
// // re-create the REDIS_STREAM_LISTENER_CONTAINER
// createStreamReader(redisStreamListenerContainerBeanName,
// streamKey);
//
// }
//
// private void prepareEnv() {
//
// getRedisStreamConfig();
//
// }
//
// private void getRedisStreamConfig() {
//
// this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig",
// RedisStreamReaderConfig.class);
// }
//
//
// private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
//
// log.debug("start to create the redis stream listener container");
// // create the lazy bean
//
// StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName,
// StreamMessageListenerContainer.class);
//
// REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey,
// streamMessageListenerContainer);
//
// // very important
// log.debug("start the listener container");
// streamMessageListenerContainer.start();
//
//
// }
//
// private void modifyStreamReader(String streamKey, ArrayList<String> executionResult) {
//
// log.debug("start to modify the redis stream listener container stream key");
// String oldStreamKey = redisStreamReaderConfig.getStreamKey();
//
// log.debug("change stream key from [{}] to [{}]",
// oldStreamKey,
// streamKey);
//
// log.debug("start to set the Redis Stream Reader key");
// redisStreamReaderConfig.setStreamKey(streamKey);
//
// log.debug("start to set the Redis Stream Execution Result Container");
// redisStreamReaderConfig.setExecutionResult(executionResult);
//
// }
//
//
// private void destroyStreamReader(String streamKey) {
//
// String oldStreamKey = redisStreamReaderConfig.getStreamKey();
//
// if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) {
//
// StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey);
//
// log.debug("destroyed old redis stream listener container is [ {} ]",
// streamMessageListenerContainer);
//
//
// // double destroy
// SpringUtils.destroyBean(streamMessageListenerContainer);
// streamMessageListenerContainer.stop();
// // help gc
// streamMessageListenerContainer = null;
// }
//
//
// }
//}

View File

@@ -1,87 +0,0 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import java.util.ArrayList;
@Getter
@Setter
@Slf4j
public class CommandResultReader implements StreamListener<String, MapRecord<String, String, String>> {
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde
//https://segmentfault.com/a/1190000040946712
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
private CommandReaderConfig commandReaderConfig;
public CommandResultReader(String consumerType, String group, String consumerName) {
new CommandResultReader(consumerType,
group,
consumerName,
null);
}
public CommandResultReader(String consumerType, String group, String consumerName, ArrayList<String> executionResult) {
this.commandReaderConfig = CommandReaderConfig
.builder()
.consumerName(consumerName)
.group(group)
.consumerType(consumerType)
.ExecutionResult(executionResult)
.build();
}
public CommandResultReader(CommandReaderConfig commandReaderConfig) {
this.commandReaderConfig = commandReaderConfig;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
String streamKey = message.getStream();
RecordId messageId = message.getId();
String key = (String) message.getValue()
.keySet()
.toArray()[0];
String value = message.getValue()
.get(key);
ArrayList<String> executionResultFormat = ExecutionResultStringDeserializer.format(value);
// 赋值给外部的结果,是的执行的结果可以被拿到
this.commandReaderConfig.setExecutionResult(executionResultFormat);
this.commandReaderConfig.setRecordId(String.valueOf(messageId));
log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]",
streamKey,
executionResultFormat.get(1),
key,
messageId);
// print to console
executionResultFormat
.stream()
.forEach(
System.out::println
);
}
}

View File

@@ -1,121 +0,0 @@
//package io.wdd.rpc.execute.result;
//
//
//import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader;
//import lombok.Getter;
//import lombok.Setter;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.context.annotation.Lazy;
//import org.springframework.context.annotation.Scope;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
//import javax.annotation.Resource;
//import java.time.Duration;
//import java.util.ArrayList;
//
//@Configuration
//@Slf4j
//@Getter
//@Setter
//public class RedisStreamReaderConfig {
//
// @Resource
// private RedisConnectionFactory redisConnectionFactory;
//
// public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer";
//
// public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer";
//
// public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer";
//
// public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer";
//
// /**
// * used in old model
// */
// private String streamKey = "cccc";
//
// /**
// * no use
// */
// private ArrayList<String> executionResult = null;
//
//
// @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> executionResultRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// return listenerContainer;
// }
//
//
// @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
// @Scope("prototype")
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> commandResultRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// // todo 此部分可以被移出到另外的位置会更加方便就不需要对此Bean进行创建和销毁了
// listenerContainer.receive(
//
// StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
//
// new CommandResultReader(
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// streamKey,
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// executionResult
// )
//
// );
//
// return listenerContainer;
// }
//
// @Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER)
// @Scope("prototype")
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> agentStatusRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// listenerContainer.receive(
//
// StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
//
// new AgentStatusStreamReader(
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// REDIS_STREAM_LISTENER_CONSUMER_NAME)
//
// );
//
// return listenerContainer;
// }
//
//
//}

View File

@@ -1,92 +1,16 @@
package io.wdd.rpc.execute.service;
import java.util.ArrayList;
import java.util.HashMap;
import io.wdd.rpc.message.OctopusMessage;
import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface AsyncExecutionService {
/**
* ------------------------ Sync Command Executor ------------------------------
*/
ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, List<String> commandList);
ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
List<ArrayList<String>> AsyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList);
/**
* 调用 单行命令脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
ArrayList<String> AsyncSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* -------------------------------------------------
*/
ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList);
List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/**
* 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param futureKey
* @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key
*/
ArrayList<String> AsyncSendCommandToAgentComplete(
String agentTopicName,
List<OctopusMessage> AsyncCallSendCommandToAgent(
List<String> agentTopicNameList,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
@@ -94,4 +18,30 @@ public interface AsyncExecutionService {
boolean durationTask
);
/**
* 同步命令调用的方法
*
* @param agentTopicName
* @param type
* @param funcContent
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
OctopusMessage AsyncCallSendCommandToAgent(
String agentTopicName,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
}

View File

@@ -1,256 +1,152 @@
package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
//@Service
@Slf4j
public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
private static final String MANUAL_COMMAND_TYPE = "manual-command";
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
OMessageToAgentSender oMessageToAgentSender;
@Resource
SyncExecutionService asyncExecutionService;
ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
/**
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override
public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
null,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList) {
public List<OctopusMessage> AsyncCallSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
agentTopicName -> {
return this.AsyncCallSendCommandToAgent(
agentTopicName,
type,
funcContent,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
}
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
@Override
public ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList, boolean isDurationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
isDurationTask
)
)
.collect(Collectors.toList());
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public List<ArrayList<String>> AsyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
atnFutureKey.get(agentTopicName),
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> AsyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return this.AsyncSendCommandToAgentComplete(
agentTopicName,
type,
commandList,
commandListComplete,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
@Override
public ArrayList<String> AsyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
LocalDateTime initTime = octopusMessage.getInit_time();
// OM 中的result保存
ArrayList<String> result = new ArrayList<>();
// 构造消息等待对象
int commandCount = 1;
if (null != commandListComplete) {
commandCount = Math.max(
commandListComplete.size(),
1
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error(
"agentTopicName异常! 输入为 => {}",
agentTopicName
);
return null;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
}
// 构造回复信息的内容
OctopusMessageSynScReplayContend executionReplayContent = OctopusMessageSynScReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch();
// 开始等待结果
asyncWaitOctopusMessageResultService.waitFor(executionReplayContent);
// send the message
oMessageToAgentSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey
);
// help gc
executionMessage = null;
return octopusMessage;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
// 监听结果
try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent);
// 解析结果
executionReplayContent
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
return OctopusMessage
.builder()
.octopusMessageType(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
objectMapper.writeValueAsString(executionMessage)
)
.forEachOrdered(
singleResult -> result.addAll(singleResult)
);
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
// 返回 执行的结果
return result;
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
}

View File

@@ -1,82 +1,18 @@
package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface SyncExecutionService {
String SyncSendCommandToAgent(String agentTopicName, String command);
String SyncSendCommandToAgent(String agentTopicName, List<String> commandList);
String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
List<String> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask);
/**
* 调用 单行命令脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
String SyncSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* -------------------------------------------------
*/
String SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask);
List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/**
* 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param futureKey
* @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key
*/
String SyncSendCommandToAgent(
String agentTopicName,
List<ArrayList<String>> SyncSendCommandToAgentComplete(
List<String> agentTopicNameList,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
@@ -86,20 +22,21 @@ public interface SyncExecutionService {
/**
* 同步命令调用的方法
* 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param funcContent
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
* @return resultKey 本次操作在Redis中记录的结果Key
*/
OctopusMessage AsyncCallSendCommandToAgent(
ArrayList<String> SyncSendCommandToAgentComplete(
String agentTopicName,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,

View File

@@ -1,370 +1,135 @@
package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
@Service
//@Service
@Slf4j
public class SyncExecutionServiceImpl implements SyncExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command";
private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource
OMessageToAgentSender oMessageToAgentSender;
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Resource
ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
@Override
public String SyncSendCommandToAgent(String agentTopicName, String command) {
return this.SyncSendCommandToAgent(
agentTopicName,
List.of(command)
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SyncSendCommandToAgent(
agentTopicName,
MANUAL_COMMAND_TYPE,
commandList
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
false,
null,
false
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
needResultReplay,
futureKey,
durationTask
);
}
@Override
public String SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
null,
false
);
}
@Override
public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete, boolean isDurationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
null,
isDurationTask
)
)
.collect(Collectors.toList());
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
futureKey,
false
);
}
@Override
public String SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 调用最底层的方法
this.AsyncCallSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
return resultKey;
}
@Override
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error(
"agentTopicName异常! 输入为 => {}",
agentTopicName
);
return null;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
}
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
// send the message
oMessageToAgentSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey
);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean
/*ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(
octopusMessage,
executionMessage
);*/
// send resultKey to ExecutionResultDaemonHandler
// 当批量执行,产生大量的resultKey的时候,会出现线程爆炸,导致所有的全部失效
/*WAIT_EXECUTION_RESULT_LIST.put(
resultKey,
executionLog
);*/
// help gc
executionMessage = null;
return octopusMessage;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
try {
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
objectMapper.writeValueAsString(executionMessage)
)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid());
executionLog.setResultKey((String) octopusMessage.getContent());
executionLog.setCommandList(String.valueOf(executionMessage.getSingleLineCommand()));
executionLog.setType(executionMessage.getType());
executionLog.setResultKey(executionMessage.getResultKey());
return executionLog;
}
@Override
public List<String> SyncSendCommandToAgent(List<String> agentagentTopicNameList, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentagentTopicNameList
.stream()
.map(
agentTopicName -> this
.SyncSendCommandToAgent
(
agentTopicName,
type,
commandList,
null,
needResultReplay,
futureKey,
durationTask
)
)
.collect(Collectors.toList());
}
AsyncExecutionService asyncExecutionService;
/**
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override
public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
completeCommandList
)
agentTopicName -> {
return this.SyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
}
)
.collect(Collectors.toList());
}
@Override
public List<String> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
public ArrayList<String> SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
atnFutureKey.getOrDefault(
agentTopicName,
null
)
)
)
.collect(Collectors.toList());
}
@Deprecated
private OctopusMessage generateOctopusMessage(String agentTopicName, String resultKey, String type, List<String> commandList, List<List<String>> commandListComplete) {
ExecutionMessage executionMessage = this.generateExecutionMessage(
// 异步访问
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent(
agentTopicName,
type,
null,
commandList,
resultKey,
commandListComplete,
false,
false
needResultReplay,
futureKey,
durationTask
);
String executionMessageString;
LocalDateTime initTime = octopusMessage.getInit_time();
try {
executionMessageString = objectMapper.writeValueAsString(executionMessage);
// OM 中的result保存
ArrayList<String> result = new ArrayList<>();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
// 构造消息等待对象
int commandCount = 1;
if (null != commandListComplete) {
commandCount = Math.max(
commandListComplete.size(),
1
);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(agentTopicName)
.build();
// 构造回复信息的内容
OctopusMessageSyncReplayContend executionReplayContent = OctopusMessageSyncReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
);
CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch();
// 开始等待结果
asyncWaitOctopusMessageResultService.waitFor(executionReplayContent);
// 监听结果
try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent);
// 解析结果
executionReplayContent
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
)
.forEachOrdered(
singleResult -> result.addAll(singleResult)
);
}
// 返回 执行的结果
return result;
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
}

View File

@@ -244,7 +244,7 @@ public class AcceptAgentInitInfo {
OctopusMessage octopusMessage = OctopusMessage
.builder()
.type(OctopusMessageType.INIT)
.octopusMessageType(OctopusMessageType.INIT)
// should be the OctopusExchange Name
.content(serverInfoContent)
.init_time(TimeUtils.currentFormatTime())

View File

@@ -15,32 +15,36 @@ import java.time.LocalDateTime;
@SuperBuilder(toBuilder = true)
public class OctopusMessage {
/**
* 应该为64位的UUID
*/
String uuid;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime init_time;
/**
* 执行操作的类型
*/
OctopusMessageType type;
OctopusMessageType octopusMessageType;
// server send message content
Object content;
// agent reply message content
Object result;
/**
* 执行结果的状态Code
*/
String ResultCode;
/**
* Agent 完成操作的时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime ac_time;
// agent reply message content
Object result;
/**
* 执行结果的状态Code
*/
String resultCode;
}

View File

@@ -3,14 +3,14 @@ package io.wdd.rpc.message.handler.async;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
/**
* 从Agent收集返回信息的统一处理地点
@@ -18,41 +18,10 @@ import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESS
* <p>
* 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容
*/
@Service
//@Service
@Slf4j
public class AsyncWaitOctopusMessageResultService {
/**
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
* KEY -> replayMatchKey
* VALUE -> OctopusMessageSynScReplayContend - 包含countDownLatch 和 result
*/
private static final HashMap<String, OctopusMessageSynScReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
public void waitFor(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) {
// 向 REPLAY_CACHE_MAP中写入 Key
OM_REPLAY_WAITING_TARGET_MAP.put(
OctopusMessageSynScReplayContend.getReplayMatchKey(),
OctopusMessageSynScReplayContend
);
// 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列
}
public void stopWaiting(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
OctopusMessageSynScReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSynScReplayContend.getReplayMatchKey());
// 移除该内容
OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSynScReplayContend.getReplayMatchKey());
// help gc
contend = null;
}
@PostConstruct
public void daemonHandleReplayOMFromAgent() {
@@ -65,6 +34,13 @@ public class AsyncWaitOctopusMessageResultService {
}
/**
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
* KEY -> replayMatchKey
* VALUE -> OctopusMessageSyncReplayContend - 包含countDownLatch 和 result
*/
private static final HashMap<String, OctopusMessageSyncReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
/**
* 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中
*/
@@ -88,8 +64,8 @@ public class AsyncWaitOctopusMessageResultService {
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 构造 replayMatchKey
String matchKey = OctopusMessageSynScReplayContend.generateMatchKey(
replayOMessage.getType(),
String matchKey = GenerateOMessageMatchKey(
replayOMessage.getOctopusMessageType(),
replayOMessage.getInit_time()
);
if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) {
@@ -97,7 +73,7 @@ public class AsyncWaitOctopusMessageResultService {
// todo 错误的数据需要放置于某处
log.debug(
"等待队列没有该回复的结果key =>",
"等待队列里面没有该回复的结果key =>",
matchKey
);
@@ -105,12 +81,12 @@ public class AsyncWaitOctopusMessageResultService {
}
// Map中包含有Key,那么放置进去
OctopusMessageSynScReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
OctopusMessageSyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
replayContend
.getReplayOMList()
.add(replayOMessage);
// 需要操作countDown
replayContend
.getCountDownLatch()
@@ -119,6 +95,30 @@ public class AsyncWaitOctopusMessageResultService {
// 结束操作,继续循环
}
}
public void waitFor(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) {
// 向 REPLAY_CACHE_MAP中写入 Key
OM_REPLAY_WAITING_TARGET_MAP.put(
OctopusMessageSyncReplayContend.getReplayMatchKey(),
OctopusMessageSyncReplayContend
);
// 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列
}
public void stopWaiting(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
OctopusMessageSyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSyncReplayContend.getReplayMatchKey());
// 移除该内容
OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSyncReplayContend.getReplayMatchKey());
// help gc
contend = null;
}

View File

@@ -14,12 +14,14 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey;
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的")
public class OctopusMessageSynScReplayContend {
public class OctopusMessageSyncReplayContend {
@ApiModelProperty("rpc消息的类型")
OctopusMessageType type;
@@ -37,34 +39,23 @@ public class OctopusMessageSynScReplayContend {
@ApiModelProperty("回复的结果列表, 临时保存")
ArrayList<OctopusMessage> replayOMList;
/**
* @param messageType
* @param messageInitTime 必须使用 TimeUtils.currentFormatTime();
* @return
*/
public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) {
String relayMatchKey = messageType.toString() + messageInitTime.toString();
return relayMatchKey;
}
/**
* Execution模块使用的模板
*
* @return
*/
public static OctopusMessageSynScReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
public static OctopusMessageSyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
CountDownLatch latch = null;
if (waitForReplayNum != 0) {
latch = new CountDownLatch(waitForReplayNum);
}
return new OctopusMessageSynScReplayContend(
return new OctopusMessageSyncReplayContend(
currentOMType,
currentTime,
generateMatchKey(
GenerateOMessageMatchKey(
currentOMType,
currentTime
),

View File

@@ -0,0 +1,124 @@
package io.wdd.rpc.message.handler.sync;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
@Service
@Slf4j(topic = "Octopus Message Handler")
public class OMessageHandler {
/**
* 创建 发送和接收 OctopusMessage之间的比对关系
*
* @param messageType
* @param messageInitTime 必须使用 TimeUtils.currentFormatTime();
* @return
*/
public static String GenerateOMessageMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) {
String relayMatchKey = messageType.toString() + messageInitTime.toString();
return relayMatchKey;
}
@PostConstruct
public void daemonHandleReplayOMFromAgent() {
// 异步任务启动
CompletableFuture.runAsync(
() -> doHandleOMessageFromAgent(),
ServerCommonPool.pool
);
}
/**
* 解析所有从Agent传回的消息中央集中化处理
*/
private void doHandleOMessageFromAgent() {
// 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT
while (true) {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
try {
OCTOPUS_MESSAGE_FROM_AGENT.wait(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回,继续死循环
continue;
}
// 拿到消息
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 构造 replayMatchKey
String matchKey = GenerateOMessageMatchKey(
replayOMessage.getOctopusMessageType(),
replayOMessage.getInit_time()
);
if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处
log.debug(
"等待队列里面没有该回复的结果key =>",
matchKey
);
continue;
}
// 归还信息
// 拿到原始信息
OctopusMessage originOMessage = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey);
originOMessage.setResultCode(replayOMessage.getResultCode());
originOMessage.setResult(replayOMessage.getResult());
// 通知等待线程
originOMessage.notify();
}
}
protected void waitFor(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.put(
matchKey,
octopusMessage
);
}
public void stopWaiting(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey);
}
}

View File

@@ -1,7 +1,6 @@
package io.wdd.rpc.message.handler.sync;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.message.OctopusMessage;
import lombok.extern.slf4j.Slf4j;
@@ -14,16 +13,20 @@ import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
@Configuration
@Slf4j(topic = "Octopus Message Handler")
public class OMessageHandlerServer {
@Slf4j(topic = "Octopus Message Listener")
public class OMessageToServerListener {
/**
* Redis Key 用于保存Agent的最新版本
* GitHubAction发送至 RabbitMQ中然后此处获取处理发送至Redis中
*/
public static final String LATEST_VERSION = "LATEST_VERSION";
/**
* 存储所有的从 Agent过来的 OctopusMessage
* 各个业务模块需要自己手动去获取自己需要的内容
@@ -32,10 +35,20 @@ public class OMessageHandlerServer {
public static ArrayDeque<OctopusMessage> OCTOPUS_MESSAGE_FROM_AGENT = new ArrayDeque<>(
128
);
/**
* 发送出去的OctopusMessage需要和返回回来的内容对比
* 返回来的OM反序列化之后就不是原对象需要进行 通过MatchKey比较
* <p>
* omMatchKey -- OctopusMessage
*/
public static HashMap<String, OctopusMessage> FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>();
@Resource
RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
OMessageHandler oMessageHandler;
@RabbitHandler
@RabbitListener(queues = "${octopus.message.octopus_to_server}"
@@ -45,7 +58,7 @@ public class OMessageHandlerServer {
OctopusMessage octopusMessage;
try {
octopusMessage = objectMapper.readValue(
octopusMessage = OctopusObjectMapper.readValue(
message.getBody(),
OctopusMessage.class
);
@@ -70,6 +83,7 @@ public class OMessageHandlerServer {
"开始向Redis中缓存Agent的最新版本 => {}",
latestVersion
);
redisTemplate
.opsForValue()
.set(
@@ -82,11 +96,9 @@ public class OMessageHandlerServer {
// 将收到的消息,直接存储到 缓存队列中
log.debug("cache the octopus message to inner cache list !");
OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage);
oMessageHandler.waitFor(octopusMessage);
// collect all message from agent and log to somewhere
// 1. send some info to the specific topic name
// 2. judge from which agent the message are
//
// 唤醒等待线程
OCTOPUS_MESSAGE_FROM_AGENT.notify();
}
}

View File

@@ -1,6 +1,7 @@
package io.wdd.rpc.message.sender;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.init.InitRabbitMQConfig;
@@ -14,6 +15,8 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
/**
* adaptor
* provide override method to convert Object and send to rabbitmq
@@ -34,25 +37,38 @@ public class OMessageToAgentSender {
/**
* send to Queue -- InitFromServer
*
* @param message octopus message
* @param octopusMessage octopus octopusMessage
*/
public void sendINIT(OctopusMessage message) {
public void sendINIT(OctopusMessage octopusMessage) {
// only accept INIT type message
if (!OctopusMessageType.INIT.equals(message.getType())) {
// only accept INIT type octopusMessage
if (!OctopusMessageType.INIT.equals(octopusMessage.getOctopusMessageType())) {
throw new MyRuntimeException("To Agent Order method usage wrong !");
}
// send to Queue -- InitFromServer
log.info(
"send INIT OrderCommand to Agent = {}",
message
octopusMessage
);
// 统一处理Content
if (octopusMessage.getContent() instanceof String) {
try {
String contendString = OctopusObjectMapper.writeValueAsString(octopusMessage.getContent());
octopusMessage.setContent(contendString);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
rabbitTemplate.convertAndSend(
initRabbitMQConfig.INIT_EXCHANGE,
initRabbitMQConfig.INIT_FROM_SERVER_KEY,
writeData(message)
writeData(octopusMessage)
);
}
@@ -66,6 +82,19 @@ public class OMessageToAgentSender {
octopusMessage.getUuid()
);
// 统一处理Content
if (!(octopusMessage.getContent() instanceof String)) {
try {
String contendString = OctopusObjectMapper.writeValueAsString(octopusMessage.getContent());
octopusMessage.setContent(contendString);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
rabbitTemplate.convertAndSend(
initRabbitMQConfig.OCTOPUS_EXCHANGE,
octopusMessage.getUuid() + "*",

View File

@@ -1,32 +1,32 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.config.QuartzLogOperator;
import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
public class AgentAliveStatusMonitorJob extends QuartzJobBean {
@Resource
AgentAliveStatusMonitorService agentAliveStatusMonitorService;
@Resource
QuartzLogOperator quartzLogOperator;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// get the jobMetaMap
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service
agentAliveStatusMonitorService.collectAllAgentAliveStatus();
// log to somewhere
quartzLogOperator.save();
}
}
//package io.wdd.rpc.scheduler.job;
//
//import io.wdd.rpc.scheduler.config.QuartzLogOperator;
//import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService;
//import org.quartz.JobExecutionContext;
//import org.quartz.JobExecutionException;
//import org.springframework.scheduling.quartz.QuartzJobBean;
//
//import javax.annotation.Resource;
//
//public class AgentAliveStatusMonitorJob extends QuartzJobBean {
//
// @Resource
// AgentAliveStatusMonitorService agentAliveStatusMonitorService;
//
// @Resource
// QuartzLogOperator quartzLogOperator;
//
// @Override
// protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//
// // get the jobMetaMap
// //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
//
// // actually execute the monitor service
// agentAliveStatusMonitorService.collectAllAgentAliveStatus();
//
// // log to somewhere
// quartzLogOperator.save();
//
// }
//}

View File

@@ -1,29 +1,21 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
public class AgentMetricStatusJob extends QuartzJobBean {
@Resource
AgentMetricStatusCollectService agentMetricStatusCollectService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// 从JobDetailContext中获取相应的信息
// JobDataMap jobDataMap = jobExecutionContext
// .getJobDetail()
// .getJobDataMap();
// 执行Agent Metric 状态收集任务
agentMetricStatusCollectService.collectHealthyAgentMetric();
}
}
//public class AgentMetricStatusJob extends QuartzJobBean {
//
// @Resource
// AgentMetricStatusCollectService agentMetricStatusCollectService;
//
// @Override
// protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//
// // 从JobDetailContext中获取相应的信息
//// JobDataMap jobDataMap = jobExecutionContext
//// .getJobDetail()
//// .getJobDataMap();
//
// // 执行Agent Metric 状态收集任务
// agentMetricStatusCollectService.collectHealthyAgentMetric();
//
// }
//
//}

View File

@@ -1,17 +1,14 @@
package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob;
import io.wdd.rpc.scheduler.job.AgentMetricStatusJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
//@Component
@Slf4j
public class BuildStatusScheduleTask {
@@ -31,10 +28,10 @@ public class BuildStatusScheduleTask {
private void buildAllPreScheduledTask() {
// Agent存活健康状态检查
buildMonitorAllAgentAliveStatusScheduleTask();
// buildMonitorAllAgentAliveStatusScheduleTask();
// Agent运行信息检查 Metric
buildAgentMetricScheduleTask();
// buildAgentMetricScheduleTask();
}
@@ -45,43 +42,19 @@ public class BuildStatusScheduleTask {
* <p>
* 2023年7月10日 更改为按照cron表达式进行执行
*/
public void buildAgentMetricScheduleTask() {
// 2023年7月10日 更改为按照cron表达式进行执行
octopusQuartzService.addMission(
AgentMetricStatusJob.class,
"agentRunMetricStatusJob",
JOB_GROUP_NAME,
metricReportStartDelaySeconds,
metricReportCronExpress,
null
);
// 计算 Metric检测的时间间隔
/*int metricReportTimesCount = 19;
try {
CronExpression cronExpression = new CronExpression(healthyCronTimeExpress);
Date now = new Date();
Date nextValidTime = cronExpression.getNextValidTimeAfter(now);
long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000;
metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1;
*//*System.out.println("totalSeconds = " + totalSeconds);
System.out.println("metricReportTimesCount = " + metricReportTimesCount);*//*
} catch (ParseException e) {
throw new RuntimeException(e);
}
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put(METRIC_REPORT_TIME_PINCH, metricReportTimePinch);
map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount);*/
//
}
// public void buildAgentMetricScheduleTask() {
//
// // 2023年7月10日 更改为按照cron表达式进行执行
// octopusQuartzService.addMission(
// AgentMetricStatusJob.class,
// "agentRunMetricStatusJob",
// JOB_GROUP_NAME,
// metricReportStartDelaySeconds,
// metricReportCronExpress,
// null
// );
//
// }
/**
* Agent存活健康状态检查
@@ -90,19 +63,19 @@ public class BuildStatusScheduleTask {
* 延迟触发时间 healthyCheckStartDelaySeconds
* 定时任务间隔 healthyCronTimeExpress
*/
private void buildMonitorAllAgentAliveStatusScheduleTask() {
// build the Job
octopusQuartzService.addMission(
AgentAliveStatusMonitorJob.class,
"monitorAllAgentAliveStatusJob",
JOB_GROUP_NAME,
healthyCheckStartDelaySeconds,
healthyCronTimeExpress,
null
);
}
// private void buildMonitorAllAgentAliveStatusScheduleTask() {
//
// // build the Job
// octopusQuartzService.addMission(
// AgentAliveStatusMonitorJob.class,
// "monitorAllAgentAliveStatusJob",
// JOB_GROUP_NAME,
// healthyCheckStartDelaySeconds,
// healthyCronTimeExpress,
// null
// );
//
// }
}

View File

@@ -1,27 +1,29 @@
package io.wdd.rpc.scheduler.service.script;
import io.wdd.rpc.execute.service.SyncExecutionService;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
/**
* 1.
*/
@Service
//@Service
@Slf4j
@Deprecated
public class AgentApplyScheduledScript {
@Resource
SyncExecutionService asyncExecutionService;
AsyncExecutionService asyncExecutionService;
@Resource
QuartzSchedulerUtils quartzSchedulerUtils;
@@ -46,13 +48,35 @@ public class AgentApplyScheduledScript {
}
// 发送命令到Agent中
List<String> resultKeyList = asyncExecutionService
.SyncSendCommandToAgentComplete(
targetMachineList,
scriptType,
completeCommandList,
futureResultKeyMap
);
String finalScriptType = scriptType;
List<String> resultKeyList = targetMachineList
.stream()
.map(
targetMachine -> {
OctopusMessage octopusMessage = asyncExecutionService
.AsyncCallSendCommandToAgent(
targetMachine,
finalScriptType,
null,
null,
completeCommandList,
false,
null,
false
);
String resultKey = ((ExecutionMessage) octopusMessage.getContent()).getResultKey();
// 构建Map
futureResultKeyMap.put(
targetMachine,
resultKey
);
return resultKey;
}
)
.collect(Collectors.toList());
// 将 resultKeyList 放入这个DTO中
scriptSchedulerDTO.setResultKeyList(resultKeyList);

View File

@@ -7,7 +7,6 @@ import io.wdd.rpc.status.beans.AgentStatus;
import io.wdd.rpc.status.service.SyncStatusService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
@@ -19,7 +18,7 @@ import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAM
/**
* 定时任务 收集Agent的运行Metric的实际执行类
*/
@Service
//@Service
@Slf4j
public class AgentMetricStatusCollectService {

View File

@@ -51,7 +51,7 @@ public class OctopusStatusMessage {
return OctopusMessage
.builder()
.type(OctopusMessageType.STATUS)
.octopusMessageType(OctopusMessageType.STATUS)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)

View File

@@ -6,12 +6,11 @@ import io.wdd.rpc.beans.request.MetricQueryEntity;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.beans.AgentStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@@ -27,7 +26,7 @@ import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.rpc.status.OctopusStatusMessage.*;
@Slf4j
@Service
//@Service
public class SyncStatusServiceImpl implements SyncStatusService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS;
@@ -64,7 +63,7 @@ public class SyncStatusServiceImpl implements SyncStatusService {
);
// 同步收集消息
OctopusMessageSynScReplayContend statusSyncReplayContend = OctopusMessageSynScReplayContend.build(
OctopusMessageSyncReplayContend statusSyncReplayContend = OctopusMessageSyncReplayContend.build(
agentTopicNameList.size(),
CurrentAppOctopusMessageType,
currentTime
@@ -128,7 +127,7 @@ public class SyncStatusServiceImpl implements SyncStatusService {
);
// 同步等待结果, 并且解析结果
OctopusMessageSynScReplayContend metricSyncReplayContend = OctopusMessageSynScReplayContend.build(
OctopusMessageSyncReplayContend metricSyncReplayContend = OctopusMessageSyncReplayContend.build(
agentTopicNameList.size(),
CurrentAppOctopusMessageType,
currentTime

View File

@@ -83,14 +83,16 @@ mybatis-plus:
banner: false
configuration:
# 希望知道所有的sql是怎么执行的, 配置输出日志
#log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl
# 数据库下划线--实体类也是下划线 需要为false
map-underscore-to-camel-case: true
# 一级缓存的 缓存级别默认为 session如果要关闭一级缓存可以设置为 statement
local-cache-scope: session
# 是否开启二级缓存
cache-enabled: false
# 分页插件配置
interceptor: com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor
# 默认地址为 classpath*:/mapper/**/*.xml
# mapper-locations: classpath*:/real-mappers/**/*.xml
@@ -162,6 +164,6 @@ oss:
# 开启debug模式
logging:
level:
io.wdd: debug
io.wdd.rpc.execute: debug
debug: true

View File

@@ -1,6 +1,6 @@
package io.wdd.server;
import io.wdd.rpc.execute.service.SyncExecutionService;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@@ -13,7 +13,7 @@ class ServerApplicationTests {
@Resource
SyncExecutionService asyncExecutionService;
AsyncExecutionService asyncExecutionService;
@Test
void testCoreExecutionCompleteScript() {

View File

@@ -1,10 +1,5 @@
package io.wdd.server;
import io.wdd.rpc.status.AgentHealthyStatusEnum;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -13,33 +8,6 @@ public class SimpleTest {
public static void main(String[] args) {
HashMap<String, String> map = new HashMap<>();
HashMap<String, List<String>> hashMap = new HashMap<>();
hashMap.put(
"HEALTHY",
new ArrayList<>(
List.of(
"Tokyo-amd64-07-f66a41",
"Tokyo-amd64-03-dc543f"
)
)
);
hashMap
.get(AgentHealthyStatusEnum.FAILED.getStatus())
.stream()
.forEach(
agentTopicName -> {
map.put(
agentTopicName,
"0"
);
}
);
}
private void CompletableFutureTest() {