[ Service ] [ Executor ] 重构Executor部分

This commit is contained in:
zeaslity
2023-08-14 10:09:23 +08:00
parent a2b6b01fd3
commit 92390b4d6f
14 changed files with 72 additions and 462 deletions

View File

@@ -1,7 +1,7 @@
package io.wdd.func.script.service;
import io.wdd.rpc.execute.ExecutionMessageType;
import io.wdd.rpc.execute.ExecutionService;
import io.wdd.rpc.execute.service.ExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -56,7 +56,6 @@ public class FuncServiceImpl implements FuncService {
null,
null,
true,
"",
false
);

View File

@@ -3,7 +3,7 @@ package io.wdd.func.xray.service;
import io.wdd.common.utils.TimeUtils;
import io.wdd.func.oss.config.OctopusObjectSummary;
import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.rpc.execute.ExecutionService;
import io.wdd.rpc.execute.service.ExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -138,7 +138,6 @@ public class XrayCallAgent {
null,
updateXrayCommandList,
false,
null,
false
);

View File

@@ -4,7 +4,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.wdd.common.response.R;
import io.wdd.rpc.execute.ExecutionService;
import io.wdd.rpc.execute.service.ExecutionService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -117,7 +117,6 @@ public class ExecutionController {
commandList,
completeCommandList,
false,
null,
false
);

View File

@@ -1,13 +1,11 @@
package io.wdd.rpc.execute;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.wdd.common.utils.TimeUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
import java.util.List;
@Data
@@ -35,7 +33,7 @@ public class ExecutionMessage {
* 用于区分 ExecutionMessage的类型
* BASE APP
*/
private String type;
private String executionType;
/**
* 执行功能脚本时需要的参数
@@ -52,30 +50,5 @@ public class ExecutionMessage {
*/
private List<List<String>> multiLineCommand;
/**
* 词条执行命令的返回结果在Redis中的ResultKey
*/
private String resultKey;
/**
* 生成 Command结果的 resultKey
*
* @param topicName
* @return
*/
public static String GetResultKey(String topicName) {
return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit();
}
/**
* 延迟执行任务执行的Key为未来的生成这个和Key
*
* @param topicName
* @param futureExecutionTime
* @return
*/
public static String GetFutureResultKey(String topicName, LocalDateTime futureExecutionTime) {
return topicName + "-Execution:" + TimeUtils.localDateTimeString(futureExecutionTime);
}
}

View File

@@ -1,21 +0,0 @@
package io.wdd.rpc.execute;
import java.util.ArrayList;
import java.util.List;
public interface ExecutionService {
ArrayList<String> SendCommandToAgent(
String agentTopicName,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String resultKey,
boolean durationTask
);
}

View File

@@ -1,152 +0,0 @@
package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
//@Service
@Slf4j
public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command";
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
@Override
public List<OctopusMessage> AsyncCallSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> {
return this.AsyncCallSendCommandToAgent(
agentTopicName,
type,
funcContent,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
}
)
.collect(Collectors.toList());
}
@Override
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error(
"agentTopicName异常! 输入为 => {}",
agentTopicName
);
return null;
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
}
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
);
OctopusMessage octopusMessage = this.generateOctopusMessage(
agentTopicName,
executionMessage
);
// send the message
oMessageToAgentSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate
.opsForStream()
.createGroup(
resultKey,
resultKey
);
log.debug(
"set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey
);
// help gc
executionMessage = null;
return octopusMessage;
}
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
try {
return OctopusMessage
.builder()
.octopusMessageType(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
objectMapper.writeValueAsString(executionMessage)
)
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
.durationTask(durationTask)
.build();
}
}

View File

@@ -1,203 +0,0 @@
//package io.wdd.rpc.execute.service;
//
//
//import io.wdd.common.utils.TimeUtils;
//import io.wdd.rpc.execute.config.CommandReaderConfig;
//import io.wdd.rpc.execute.config.ExecutionLog;
//import io.wdd.rpc.execute.result.BuildStreamReader;
//import io.wdd.server.service.ExecutionLogService;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.commons.collections.CollectionUtils;
//import org.springframework.context.annotation.Lazy;
//
//import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.Collection;
//import java.util.List;
//import java.util.concurrent.*;
//
///**
// * 1. [waiting strategy ]
// * 2. [build the redis stream listener]
// * 3. [call persistence]
// */
////@Service
//@Slf4j
//@Lazy
//@Deprecated
//public class ExecutionResultDaemonHandler {
//
// /**
// * store all execution result key
// * <p>
// * which means there are execution running , waiting for their result to handle
// */
// public static final ConcurrentHashMap<String, ExecutionLog> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
// private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
//
// @Resource
// BuildStreamReader buildStreamReader;
//
// @Resource
// CommandReaderConfig commandReaderConfig;
//
// @Resource
// ExecutionLogService executionLogService;
//
// @PostConstruct
// public void startExecutionDaemonHandler() {
//
// // 启动一个异步线程,运行 Execution结果处理守护进程
// CompletableFuture.runAsync(
// () -> realStartExecutionDaemonHandler()
// );
//
// }
//
// private void realStartExecutionDaemonHandler() {
//
// while (true) {
//
// while (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
// try {
// // no execution result need to handle
//
// // wait for 5 seconds
// log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !");
// TimeUnit.SECONDS.sleep(5);
//
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
// // has result to handle , just handle one result at one time
// String resultKey = WAIT_EXECUTION_RESULT_LIST
// .keys()
// .nextElement();
//
// log.debug(
// "current result key is [{}]",
// resultKey
// );
//
//
// CompletableFuture<ArrayList<String>> executionResultFuture =
// CompletableFuture
// .supplyAsync(
// () -> {
// // 修改相应的参数
// commandReaderConfig.setStreamKey(resultKey);
// // listener container 实际上是根据这个绑定的
// commandReaderConfig.setGroup(resultKey);
// // 必须归零
// commandReaderConfig.setExecutionResult(null);
//
// // 构造 resultKey对应的 Redis Stream Listener Container
// buildStreamReader
// .buildStreamReader(commandReaderConfig);
//
// // 获得结果
// ArrayList<String> s = new ArrayList<>(
// List.of("no no no")
// );
//
// try {
// s = CompletableFuture
// .supplyAsync(
// () -> {
// while (true) {
// // todo 多条命令时,这里只能获取到一个结果
// if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) {
// return commandReaderConfig.getExecutionResult();
// }
//
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// }
// )
// // 获取相应的结果
// .get(
// MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } catch (ExecutionException e) {
// throw new RuntimeException(e);
// } catch (TimeoutException e) {
// throw new RuntimeException(e);
// }
//
//
// return s;
// }
// );
//
// CompletableFuture<ArrayList<String>> falloutTimeFuture = CompletableFuture.supplyAsync(
// () -> {
// try {
// TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// return null;
// }
// );
//
// // 获取结果然后销毁Stream Listener Container
// CompletableFuture<Object> complete = CompletableFuture
// .anyOf(
// falloutTimeFuture,
// executionResultFuture
// );
//
// complete
// .whenComplete(
// (result, e) -> {
//
// log.debug(
// "execution result are => {}",
// result
// );
//
// // 持久化存储对应的结果
// ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey);
// executionLog.setAcTime(TimeUtils.currentTime());
// executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult()));
// executionLog.setResultCode(
// CollectionUtils.isEmpty((Collection) result) ? 1 : 0
// );
// executionLog.setRecordId(commandReaderConfig.getRecordId());
//
//
// // 保存操作
// executionLogService.save(executionLog);
//
// // 清除此次任务的内容
// WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
// log.info(
// "[Execution] - command {} result are {} result code is {} ,whole process are complete !",
// executionLog.getCommandList(),
// executionLog.getResultContent(),
// executionLog.getResultCode()
// );
// }
// );
//
// // very important
// // stuck the main thread , otherwise it will create a dead loop
// complete.join();
//
// }
//
// }
//
//
//}

View File

@@ -1,45 +1,31 @@
package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage;
import java.util.ArrayList;
import java.util.List;
public interface AsyncExecutionService {
List<OctopusMessage> AsyncCallSendCommandToAgent(
List<String> agentTopicNameList,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
public interface ExecutionService {
/**
* 同步命令调用的方法
*
* @param agentTopicName
* @param type
* @param funcContent
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
* 发送 指令 给Agent, 正确错误信息全都会被接收
*/
OctopusMessage AsyncCallSendCommandToAgent(
ArrayList<String> SendCommandToAgent(
String agentTopicName,
String type,
String executionType,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
boolean durationTask
);
List<ArrayList<String>> SendCommandToAgentBatch(
List<String> agentTopicNameList,
String executionType,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);

View File

@@ -1,6 +1,7 @@
package io.wdd.rpc.execute;
package io.wdd.rpc.execute.service;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.OMessageReplayContent;
@@ -14,6 +15,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.message.handler.OMessageHandler.*;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
@@ -24,25 +26,30 @@ public class ExecutionServiceImpl implements ExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command";
private static final int COMMAND_MAX_WAIT_TIMEOUT = 15;
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Override
public ArrayList<String> SendCommandToAgent(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String resultKey, boolean durationTask) {
public ArrayList<String> SendCommandToAgent(String agentTopicName, String executionType, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
ArrayList<String> commandResultLog = null;
if (!validateCommandInfo(agentTopicName)) {
return commandResultLog;
}
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
if (StringUtils.isEmpty(executionType)) {
executionType = MANUAL_COMMAND_TYPE;
}
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this
.generateExecutionMessage(
type,
executionType,
commandList,
resultKey,
commandListComplete,
needResultReplay,
durationTask
@@ -67,6 +74,7 @@ public class ExecutionServiceImpl implements ExecutionService {
// 需要返回结果
if (!durationTask) {
// 等待结果
// countDownLatch的个数约定为1
OMessageReplayContent replayContent = WaitFromAgent(
octopusMessage,
1
@@ -77,7 +85,7 @@ public class ExecutionServiceImpl implements ExecutionService {
try {
waitOK = replayLatch.await(
10,
COMMAND_MAX_WAIT_TIMEOUT,
TimeUnit.SECONDS
);
@@ -120,12 +128,11 @@ public class ExecutionServiceImpl implements ExecutionService {
.build();
}
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
private ExecutionMessage generateExecutionMessage(String executionType, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
return ExecutionMessage
.builder()
.resultKey(resultKey)
.type(type)
.executionType(executionType)
.singleLineCommand(commandList)
.multiLineCommand(commandListComplete)
.needResultReplay(needResultReplay)
@@ -134,7 +141,7 @@ public class ExecutionServiceImpl implements ExecutionService {
}
private boolean validateCommandInfo(String agentTopicName, String type) {
private boolean validateCommandInfo(String agentTopicName) {
// 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
@@ -149,4 +156,29 @@ public class ExecutionServiceImpl implements ExecutionService {
return true;
}
@Override
public List<ArrayList<String>> SendCommandToAgentBatch(List<String> agentTopicNameList, String executionType, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
List<ArrayList<String>> allResult = agentTopicNameList
.stream()
.map(
agentTopicName -> {
return this.SendCommandToAgent(
agentTopicName,
executionType,
funcContent,
commandList,
commandListComplete,
needResultReplay,
durationTask
);
}
)
.collect(Collectors.toList());
return allResult;
}
}

View File

@@ -1,7 +1,6 @@
package io.wdd.rpc.scheduler.config;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
@@ -45,10 +44,7 @@ public class QuartzSchedulerUtils {
targetMachine -> {
keyMap.put(
targetMachine,
ExecutionMessage.GetFutureResultKey(
targetMachine,
nextExecutionTime
)
"todo 2023年8月14日"
);
}
);

View File

@@ -2,7 +2,6 @@ package io.wdd.rpc.scheduler.service.script;
import io.wdd.rpc.execute.ExecutionMessage;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils;

View File

@@ -1,6 +1,5 @@
package io.wdd.server;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;