Merge remote-tracking branch 'origin/local-dev' into local-dev

This commit is contained in:
IceDerce
2023-03-01 14:33:36 +08:00
28 changed files with 990 additions and 209 deletions

View File

@@ -0,0 +1,37 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Agent-dev-oracle-s5" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
<option name="ALTERNATIVE_JRE_PATH" value="11" />
<envs>
<env name="agentVersion" value="2023-02-13" />
<env name="archInfo" value="&quot;x86_64 (64 Bit)&quot;" />
<env name="cpuBrand" value="Oracle Armpre A1" />
<env name="cpuCore" value="4 @2400MHz" />
<env name="diskTotal" value="100GB" />
<env name="diskUsage" value="&quot;12.3 GB&quot;" />
<env name="ioSpeed" value="&quot;259 MB/s&quot;" />
<env name="location" value="Seoul Seoul KR" />
<env name="machineId" value="oracles5dsaasdadas" />
<env name="managePort" value="22333" />
<env name="memoryTotal" value="&quot;7.6 GB&quot;" />
<env name="osInfo" value="&quot;Ubuntu 20.04.5 LTS&quot;" />
<env name="osKernelInfo" value="&quot;5.4.0-135-generic&quot;" />
<env name="provider" value="&quot;AS139080 The Internet Data Center of Sichuan Mobile Communication Company Limited&quot;" />
<env name="server.port" value="8001" />
<env name="serverIpInV4" value="&quot;&quot;" />
<env name="serverIpInV6" value="&quot;&quot;" />
<env name="serverIpPbV4" value="146.56.159.175" />
<env name="serverIpPbV6" value="&quot;&quot;" />
<env name="serverName" value="Seoul-arm64-02" />
<env name="tcpControl" value="bbr" />
<env name="virtualization" value="&quot;Dedicated&quot;" />
</envs>
<option name="HIDE_BANNER" value="true" />
<module name="agent" />
<option name="SPRING_BOOT_MAIN_CLASS" value="io.wdd.agent.AgentApplication" />
<option name="VM_PARAMETERS" value="-Dserver.port=8001 -Dfile.encoding=utf-8 -Ddebug=true -Dlogging.level.io.wdd.agent=debug -Dspring.profiles.active=dev -Dspring.cloud.nacos.config.group=dev -Dspring.cloud.nacos.config.extension-configs[0].dataId=common-dev.yaml -Dspring.cloud.nacos.config.extension-configs[0].group=dev" />
<method v="2">
<option name="ToolBeforeRunTask" enabled="true" actionId="Tool_External Tools_git pull" />
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@@ -1,12 +1,14 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Server-dev" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" activateToolWindowBeforeRun="false"> <configuration default="false" name="Server-dev" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot" activateToolWindowBeforeRun="false">
<option name="DEBUG_MODE" value="true"/>
<option name="HIDE_BANNER" value="true" /> <option name="HIDE_BANNER" value="true" />
<module name="server" /> <module name="server" />
<selectedOptions> <selectedOptions>
<option name="spring.boot.profiles" visible="false" /> <option name="spring.boot.profiles" visible="false" />
</selectedOptions> </selectedOptions>
<option name="SPRING_BOOT_MAIN_CLASS" value="io.wdd.ServerApplication" /> <option name="SPRING_BOOT_MAIN_CLASS" value="io.wdd.ServerApplication" />
<option name="VM_PARAMETERS" value="-Dfile.encoding=utf-8 -Dspring.profiles.active=dev -Dspring.cloud.nacos.config.group=dev -Dspring.cloud.nacos.config.extension-configs[0].dataId=common-dev.yaml -Dspring.cloud.nacos.config.extension-configs[0].group=dev" /> <option name="VM_PARAMETERS"
value="-Ddebug=true -Dlogging.level.io.wdd.server=debug -Dfile.encoding=utf-8 -Dspring.profiles.active=dev -Dspring.cloud.nacos.config.group=dev -Dspring.cloud.nacos.config.extension-configs[0].dataId=common-dev.yaml -Dspring.cloud.nacos.config.extension-configs[0].group=dev"/>
<method v="2"> <method v="2">
<option name="Make" enabled="true" /> <option name="Make" enabled="true" />
</method> </method>

View File

@@ -8,6 +8,7 @@ import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -33,7 +34,8 @@ public class CommandExecutor {
/** /**
* 一个命令执行的最长等待时间 * 一个命令执行的最长等待时间
*/ */
int processMaxWaitSeconds = 10; @Value("${octopus.agent.executor.processMaxTimeOut}")
Integer processMaxWaitSeconds;
/** /**
* 持久化命令执行的最长等待时间 * 持久化命令执行的最长等待时间
@@ -180,8 +182,6 @@ public class CommandExecutor {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
System.out.println("process isAlive = " + process.isAlive());
// 任务提前执行结束,或者超过了最长等待时间 // 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成 // 判断命令是否正确处理完成
if (!commandExecComplete) { if (!commandExecComplete) {
@@ -210,21 +210,15 @@ public class CommandExecutor {
//commandExecLogCache.PrintCommandCachedLog(streamKey); //commandExecLogCache.PrintCommandCachedLog(streamKey);
// 关停任务执行的缓存日志收集 BufferedReader 否则无法终止 // 关停任务执行的缓存日志收集 BufferedReader 否则无法终止
commandExecLogCache.StopExecLogBufferedReader( commandExecLogCache.StopCollectExecLog(
streamKey, streamKey,
process process,
); commandExecWaitTimeout
// 异步执行日志的发送工作
commandExecLogCache.CollectAndSendExecLog(
streamKey,
needResultReplay,
octopusMessage
); );
// 执行到这里,说明整个任务流程结束(超时结束) // 执行到这里,说明整个任务流程结束(超时结束)
log.debug( log.debug(
"命令 [ {} ] [ {} ]执行全流程结束! 开始释放所有资源", "命令 [ {} ] [ {} ] 执行全流程结束! 开始释放所有资源",
streamKey, streamKey,
process.info() process.info()
); );
@@ -248,6 +242,14 @@ public class CommandExecutor {
// shutdown the process // shutdown the process
process.destroyForcibly(); process.destroyForcibly();
} }
// 异步执行日志的发送工作
commandExecLogCache.CollectAndSendExecLog(
streamKey,
needResultReplay,
octopusMessage
);
} }
} }

View File

@@ -117,14 +117,20 @@ public class CommandExecLogCache {
); );
String execTimeString = String.format( String execTimeString = String.format(
"execution time is => [ %s ]", "execution date time is => [ %s ]",
TimeUtils.currentTimeString() TimeUtils.currentTimeString()
); );
String execMachineString = String.format(
"execution machine is => [ %s ]",
streamKey.split("-Execution")[0]
);
// add the command // add the command
commandCachedLog.add(execCommandString); commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString); commandCachedLog.add(execTimeString);
commandCachedLog.add(execMachineString);
commandCachedLog.add("--------------- command result are as below --------------------"); commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add(""); commandCachedLog.add("");
@@ -161,6 +167,13 @@ public class CommandExecLogCache {
commandCachedLog::add commandCachedLog::add
); );
// 对于即时执行完成的任务,需要在这里增加尾巴内容
addCommandExecTailInfo(
1,
streamKey,
0
);
log.debug( log.debug(
"命令代码 [ {} ] 的执行日志内容为 {} ", "命令代码 [ {} ] 的执行日志内容为 {} ",
streamKey, streamKey,
@@ -170,11 +183,46 @@ public class CommandExecLogCache {
); );
} }
/**
* 增加尾巴内容
*
* @param process
* @param streamKey
*/
private void addCommandExecTailInfo(int timeOut, String streamKey, int exitValue) {
log.debug("开始添加任务日志的尾部信息!");
// 添加任务结束的一些信息
String execTimeCostString = String.format(
"execution time-cost is => [ %s ]",
timeOut
);
String execResultString = String.format(
"execution result code is => [ %s ]",
exitValue
);
ArrayList<String> commandExecCachedLog = CachedCommandLogMap.get(streamKey);
commandExecCachedLog.add("");
commandExecCachedLog.add("");
commandExecCachedLog.add("--------------- command result are as above --------------------");
commandExecCachedLog.add(execTimeCostString);
commandExecCachedLog.add(execResultString);
log.debug(
"添加尾部信息完成, 结果为 => {}",
commandExecCachedLog
);
}
/** /**
* 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭 * 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭
* 否则部分任务的日志无法收集 * 否则部分任务的日志无法收集
*/ */
public void StopExecLogBufferedReader(String streamKey, Process process) { public void StopCollectExecLog(String streamKey, Process process, int commandExecWaitTimeout) {
BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey); BufferedReader bufferedReader = CommandLogBufferedReaderMap.get(streamKey);
@@ -202,6 +250,17 @@ public class CommandExecLogCache {
streamKey streamKey
); );
// 任务卡住了到现在,需要强行中止,并且添加部分日志信息
if (ObjectUtils.isNotEmpty(process) && process.isAlive()) {
addCommandExecTailInfo(
commandExecWaitTimeout,
streamKey,
233
);
}
// 移除
CommandLogBufferedReaderMap.remove(streamKey); CommandLogBufferedReaderMap.remove(streamKey);
} }
}); });

View File

@@ -9,7 +9,6 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.handler.MyRuntimeException; import io.wdd.common.handler.MyRuntimeException;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.K;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.AmqpHeaders;
@@ -88,7 +87,9 @@ public class OctopusAgentInitService {
// response chain to handle all kind of type of octopus message // response chain to handle all kind of type of octopus message
if (!octopusMessageHandler.handle(octopusMessage)) { if (!octopusMessageHandler.handle(octopusMessage)) {
throw new MyRuntimeException(" Handle Octopus Message Error !"); String s = "Handle Octopus Message Error !";
log.error(s);
throw new MyRuntimeException(s);
} }
} catch (Exception e) { } catch (Exception e) {
@@ -98,8 +99,7 @@ public class OctopusAgentInitService {
// long deliveryTag, boolean requeue // long deliveryTag, boolean requeue
// channel.basicReject(deliveryTag,true); // channel.basicReject(deliveryTag,true);
log.error("Octopus Agent Initialization Error, please check !"); log.error("Octopus Agent Initialization Error, please check ! Waiting for 5 seconds");
log.info("waiting for 5 seconds");
// 这里只是便于出现死循环时查看 // 这里只是便于出现死循环时查看
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);

View File

@@ -48,17 +48,22 @@ public class GenOctopusRabbitMQConnection {
// reboot judgement of existing exchange // reboot judgement of existing exchange
QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName); QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName);
if (ObjectUtils.isEmpty(queueInfo)) {
if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) { log.debug("开始为Agent创建相形的消息队列");
log.info("Octopus Agent Specific Topic Queue Already Existed ! == {}", agentTopicName);
return;
} }
Queue queue = new Queue(agentTopicName, true, false, false); Queue queue = new Queue(
agentTopicName,
true,
false,
false
);
Binding binding = new Binding( Binding binding = new Binding(
agentTopicName, agentTopicName,
Binding.DestinationType.QUEUE, Binding.DestinationType.QUEUE,
octopusMessage.getContent().toString(), octopusMessage
.getContent()
.toString(),
agentTopicName + "*", agentTopicName + "*",
null null
); );
@@ -74,8 +79,7 @@ public class GenOctopusRabbitMQConnection {
listenerContainer.setMessageListener(this::AgentListenToSpecificTopicOctopusMessage); listenerContainer.setMessageListener(this::AgentListenToSpecificTopicOctopusMessage);
listenerContainer.start(); listenerContainer.start();
log.info("每个Agent特定的Octopus Topic Queue创建成功!");
log.info("Specific Octopus Topic Queue Generate Successfully !");
messageListenerContainerList.add(listenerContainer); messageListenerContainerList.add(listenerContainer);
} }

View File

@@ -10,6 +10,7 @@ import org.springframework.dao.DuplicateKeyException;
import org.springframework.validation.BindException; import org.springframework.validation.BindException;
import org.springframework.validation.FieldError; import org.springframework.validation.FieldError;
import org.springframework.validation.ObjectError; import org.springframework.validation.ObjectError;
import org.springframework.web.HttpRequestMethodNotSupportedException;
import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice; import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -91,6 +92,20 @@ public class GlobalExceptionHandler {
return vo; return vo;
} }
/**
* 处理此种异常
*
* @param httpRequestMethodNotSupportedException
*/
@ExceptionHandler(value = {HttpRequestMethodNotSupportedException.class})
public void methodNotMatchHandler(HttpRequestMethodNotSupportedException httpRequestMethodNotSupportedException) {
log.debug(
httpRequestMethodNotSupportedException.getMessage()
);
}
/** /**
* 拦截数据库异常 * 拦截数据库异常
* *

View File

@@ -117,7 +117,7 @@ public class TimeUtils {
} }
/** /**
* @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String * @return UTC+8 [ yyyy-MM-dd-HH-mm-ss ] Time String
*/ */
public static String currentTimeStringFullSplit() { public static String currentTimeStringFullSplit() {

View File

@@ -61,8 +61,15 @@ public class XrayController {
) )
); );
ArrayList<ProxyNode> pathE = new ArrayList<>(
Collections.singletonList(
seoul5
)
);
// allNetworkPathList.add(pathA); // allNetworkPathList.add(pathA);
allNetworkPathList.add(pathD); // allNetworkPathList.add(pathD);
allNetworkPathList.add(pathE);
// allNetworkPathList.add(pathB); // allNetworkPathList.add(pathB);
// allNetworkPathList.add(pathC); // allNetworkPathList.add(pathC);
@@ -72,7 +79,7 @@ public class XrayController {
xrayConfigDistribute.buildXrayUpdateResult(allNetworkPathList.get(0)); xrayConfigDistribute.buildXrayUpdateResult(allNetworkPathList.get(0));
System.out.println("结束!"); System.out.println("Xray controller test() 结束!");
} }
} }

View File

@@ -17,6 +17,7 @@ public class ProxyNodeSet {
// 开发使用 // 开发使用
public static ProxyNode chengduAgent; public static ProxyNode chengduAgent;
public static ProxyNode tokyoDev; public static ProxyNode tokyoDev;
public static ProxyNode seoul5;
static { static {
@@ -117,15 +118,58 @@ public class ProxyNodeSet {
.agentTopicName("Chengdu-amd64-77-remote") .agentTopicName("Chengdu-amd64-77-remote")
.build(); .build();
ProxyNodeMap.put(chengdu.getNum(), chengdu);
ProxyNodeMap.put(hongkong.getNum(), hongkong); seoul5 = ProxyNode
ProxyNodeMap.put(shanghai.getNum(), shanghai); .builder()
ProxyNodeMap.put(seoul2.getNum(), seoul2); .location("Seoul")
ProxyNodeMap.put(tokyo2.getNum(), tokyo2); .num(97)
ProxyNodeMap.put(phoenix2.getNum(), phoenix2); .publicIPv4("146.56.159.175")
ProxyNodeMap.put(london2.getNum(), london2); .proxyNodeType(ProxyNodeType.EXTERNAL)
ProxyNodeMap.put(chengduAgent.getNum(), chengduAgent); .name("oracle-seoul5")
ProxyNodeMap.put(tokyoDev.getNum(), tokyoDev); .agentName("Seoul-arm64-02")
.agentTopicName("Seoul-arm64-02-oracle")
.build();
ProxyNodeMap.put(
chengdu.getNum(),
chengdu
);
ProxyNodeMap.put(
hongkong.getNum(),
hongkong
);
ProxyNodeMap.put(
shanghai.getNum(),
shanghai
);
ProxyNodeMap.put(
seoul2.getNum(),
seoul2
);
ProxyNodeMap.put(
tokyo2.getNum(),
tokyo2
);
ProxyNodeMap.put(
phoenix2.getNum(),
phoenix2
);
ProxyNodeMap.put(
london2.getNum(),
london2
);
ProxyNodeMap.put(
chengduAgent.getNum(),
chengduAgent
);
ProxyNodeMap.put(
tokyoDev.getNum(),
tokyoDev
);
ProxyNodeMap.put(
seoul5.getNum(),
seoul5
);
} }

View File

@@ -6,6 +6,9 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
/**
* https://xtls.github.io/config/inbounds/vmess.html#clientobject
*/
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor

View File

@@ -7,7 +7,7 @@ import io.wdd.func.oss.service.OSSCoreService;
import io.wdd.func.oss.service.OssBackendSelect; import io.wdd.func.oss.service.OssBackendSelect;
import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.func.xray.beans.node.XrayConfigInfo; import io.wdd.func.xray.beans.node.XrayConfigInfo;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.AsyncExecutionService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -112,7 +112,7 @@ public class XrayConfigDistribute {
OSSCoreService ossCoreService; OSSCoreService ossCoreService;
@Resource @Resource
CoreExecutionService executionService; AsyncExecutionService executionService;
public void uploadXrayConfigToOSS(ArrayList<ProxyNode> networkPathList) { public void uploadXrayConfigToOSS(ArrayList<ProxyNode> networkPathList) {

View File

@@ -1,7 +1,17 @@
package io.wdd.func.xray.service; package io.wdd.func.xray.service;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.wdd.common.handler.MyRuntimeException; import io.wdd.common.handler.MyRuntimeException;
import io.wdd.common.utils.TimeUtils;
import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.func.xray.beans.node.XrayConfigInfo;
import io.wdd.func.xray.beans.xray.XrayConfig;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
@@ -15,22 +25,79 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 获取rerouces目录 https://blog.csdn.net/pengpengpeng85/article/details/84785575 * 获取rerouces目录 https://blog.csdn.net/pengpengpeng85/article/details/84785575
* * <p>
* 写入文件的教程 https://cloud.tencent.com/developer/article/1895274 * 写入文件的教程 https://cloud.tencent.com/developer/article/1895274
*/ */
@Slf4j @Slf4j
@Service @Service
public class XrayConfigPersistor { public class XrayConfigPersistor {
private static String XrayResultPath = "xrayResult/"; // 参考 https://github.com/FasterXML/jackson-databind/issues/585
private static final ObjectWriter objectWriter = new ObjectMapper()
// 忽略掉 null的字段
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
// 写的文件必须是unix类型的分隔符号
.writer(
new DefaultPrettyPrinter()
.withObjectIndenter(
new DefaultIndenter()
.withLinefeed("\n")
));
private static final String XrayResultPath = "xrayResult/";
public static AtomicInteger cleanVersion = new AtomicInteger(0); public static AtomicInteger cleanVersion = new AtomicInteger(0);
/**
* 执行Xray生成配置文件的持久化工作,沈成伟临时文件保存至当前目录中
*
* @param xrayConfig
* @param currentVersion
* @param proxyNode
* @return
*/
public XrayConfigInfo persist(XrayConfig xrayConfig, int currentVersion, ProxyNode proxyNode) {
try {
// 将生成的xrayConfig直接写为字符串
String resultContent = objectWriter
.writeValueAsString(xrayConfig);
public File write(String fileName , String content, int currentVersion) { // 获得到文件名称
String timeString = TimeUtils.currentFormatTimeString();
String fileName = buildXrayConfigFileName(
proxyNode,
timeString
);
System.out.println("currentVersion = " + currentVersion); // 文件持久化!
File xrayConfigFile = write(
fileName,
resultContent,
currentVersion
);
// 文件写入完成,保存文件信息
XrayConfigInfo xrayConfigInfo = new XrayConfigInfo();
xrayConfigInfo.setXrayConfigFile(xrayConfigFile);
xrayConfigInfo.setXrayConfigFileName(fileName);
return xrayConfigInfo;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private String buildXrayConfigFileName(ProxyNode proxyNode, String timeString) {
return proxyNode.getNum() + "-" + proxyNode.getAgentName() + "-" + timeString + ".json";
}
private File write(String fileName, String content, int currentVersion) {
log.debug(
"currentVersion = {}",
currentVersion
);
if (cleanVersion.get() == currentVersion) { if (cleanVersion.get() == currentVersion) {
// 清除旧的内容 // 清除旧的内容
@@ -41,29 +108,51 @@ public class XrayConfigPersistor {
// 构造对象开始写入, 生成文件 // 构造对象开始写入, 生成文件
File resultFile = getResultFile(fileName); File resultFile = getResultFile(fileName);
try {
log.debug("开始写入XrayConfig进入文件中文件名为 => {}",fileName); FileWriter fileWriter = null;
FileWriter fileWriter = new FileWriter( BufferedWriter bufferedWriter = null;
try {
log.debug(
"开始写入XrayConfig进入文件中文件名为 => {}",
fileName
);
fileWriter = new FileWriter(
resultFile resultFile
); );
BufferedWriter bufferedWriter = new BufferedWriter( bufferedWriter = new BufferedWriter(
fileWriter fileWriter
); );
log.debug("文件内容为 => {}", content); log.debug(
"文件内容为 => {}",
content
);
bufferedWriter.write(content); bufferedWriter.write(content);
// must close
bufferedWriter.close();
fileWriter.close();
return resultFile; return resultFile;
} catch (IOException e) { } catch (IOException e) {
log.error("打开文件失败写入tmp文件失败 文件为 => {}", resultFile.getName()); log.error(
"打开文件失败写入tmp文件失败 文件为 => {}",
resultFile.getName()
);
throw new MyRuntimeException(e); throw new MyRuntimeException(e);
} finally {
try {
// must close
bufferedWriter.close();
fileWriter.close();
} catch (IOException e) {
log.error(
"关闭文件写入流失败!, 请检查 文件为 => [ {} ], 内容为 => {}",
fileName,
content
);
throw new MyRuntimeException(e);
}
} }
} }
@@ -82,11 +171,12 @@ public class XrayConfigPersistor {
} }
/** /**
* 根据文件名,需要创建一个文件 * 根据文件名,需要创建一个文件
*
* @param fileName 文件名,如 xxx.json * @param fileName 文件名,如 xxx.json
* @return * @return
*/ */
private File getResultFile(String fileName ){ private File getResultFile(String fileName) {
ClassPathResource classPathResource = new ClassPathResource(XrayResultPath); ClassPathResource classPathResource = new ClassPathResource(XrayResultPath);
@@ -99,7 +189,10 @@ public class XrayConfigPersistor {
); );
} catch (IOException e) { } catch (IOException e) {
log.error("获取文件失败请检查! fileName is => {}", fileName); log.error(
"获取文件失败请检查! fileName is => {}",
fileName
);
throw new MyRuntimeException(e); throw new MyRuntimeException(e);
} }

View File

@@ -1,9 +1,5 @@
package io.wdd.func.xray.service; package io.wdd.func.xray.service;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.func.xray.beans.node.ProxyNode; import io.wdd.func.xray.beans.node.ProxyNode;
import io.wdd.func.xray.beans.node.XrayConfigInfo; import io.wdd.func.xray.beans.node.XrayConfigInfo;
import io.wdd.func.xray.beans.xray.RoutingObject; import io.wdd.func.xray.beans.xray.RoutingObject;
@@ -20,7 +16,6 @@ import org.apache.commons.beanutils.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -36,13 +31,9 @@ import static io.wdd.func.xray.service.XrayConfigPersistor.cleanVersion;
@Slf4j @Slf4j
public class XrayCoreServiceImpl implements XrayCoreService { public class XrayCoreServiceImpl implements XrayCoreService {
@Resource
ObjectMapper objectMapper;
@Resource @Resource
XrayConfigPersistor xrayConfigPersistor; XrayConfigPersistor xrayConfigPersistor;
@Override @Override
public void generateXrayJsonFromNodeList(ArrayList<ArrayList<ProxyNode>> allNetworkPathList) { public void generateXrayJsonFromNodeList(ArrayList<ArrayList<ProxyNode>> allNetworkPathList) {
@@ -81,9 +72,6 @@ public class XrayCoreServiceImpl implements XrayCoreService {
private void generateXrayJsonSinglePath(ArrayList<ProxyNode> networkPathList) { private void generateXrayJsonSinglePath(ArrayList<ProxyNode> networkPathList) {
int pathLength = networkPathList.size(); int pathLength = networkPathList.size();
// 忽略掉 null的字段
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
// 采用 VMESS + websocket的形式形成 链状代理 // 采用 VMESS + websocket的形式形成 链状代理
// 由于 Vlss+XTLS的形式形成 链状结构 // 由于 Vlss+XTLS的形式形成 链状结构
String tag = generatePathTag(networkPathList); String tag = generatePathTag(networkPathList);
@@ -97,7 +85,8 @@ public class XrayCoreServiceImpl implements XrayCoreService {
.builder() .builder()
.id(uuid) .id(uuid)
.level(0) .level(0)
.alterId(23) // 为了进一步防止被探测,一个用户可以在主 ID 的基础上,再额外生成多个 ID。这里只需要指定额外的 ID 的数量,推荐值为 0 代表启用 VMessAEAD
.alterId(0)
.email(tag + "@octopus.io") .email(tag + "@octopus.io")
.build(); .build();
@@ -123,46 +112,19 @@ public class XrayCoreServiceImpl implements XrayCoreService {
pos pos
); );
// 持久化 // 持久化 Xray生成的文件信息
try { XrayConfigInfo xrayConfigInfo = xrayConfigPersistor.persist(
String resultContent = objectMapper xrayConfig,
.writerWithDefaultPrettyPrinter() currentVersion,
.writeValueAsString(xrayConfig); proxyNode
);
// 获得到文件名称 proxyNode.setXrayConfigInfo(xrayConfigInfo);
String timeString = TimeUtils.currentFormatTimeString();
String fileName = buildXrayConfigFileName(
proxyNode,
timeString
);
// 文件持久化!
File xrayConfigFile = xrayConfigPersistor.write(
fileName,
resultContent,
currentVersion
);
// 文件写入完成,保存文件信息
XrayConfigInfo xrayConfigInfo = new XrayConfigInfo();
xrayConfigInfo.setXrayConfigFile(xrayConfigFile);
xrayConfigInfo.setXrayConfigFileName(fileName);
proxyNode.setXrayConfigInfo(xrayConfigInfo);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} }
} }
private String buildXrayConfigFileName(ProxyNode proxyNode, String timeString) {
return proxyNode.getNum() + "-" + proxyNode.getAgentName() + "-" + timeString + ".json";
}
private XrayConfig doBuildXrayConfig(boolean isOutBoundFree, String tag, ClientObject clientObject, int port, ArrayList<ProxyNode> networkPathList, int pos) { private XrayConfig doBuildXrayConfig(boolean isOutBoundFree, String tag, ClientObject clientObject, int port, ArrayList<ProxyNode> networkPathList, int pos) {

View File

@@ -19,7 +19,6 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -31,7 +30,6 @@ import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
@@ -67,7 +65,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 组装信息至集合中 // 组装信息至集合中
LocalDateTime currentTime = TimeUtils.currentFormatTime(); LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 发送OctopusMessage-Agent // 发送OctopusMessage-Agent
buildOMessageAndSendToAllHealthyAgent( buildOMessageAndSendToAllHealthyAgent(
AgentOperationType.VERSION, AgentOperationType.VERSION,
@@ -75,21 +72,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// 构造 异步结果监听内容 // 构造 异步结果监听内容
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); OMReplayContend omReplayContend = OMReplayContend.build(
ArrayList<OctopusMessage> replayOMList = new ArrayList<>(); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
OMReplayContend omReplayContend = OMReplayContend CurrentAppOctopusMessageType,
.builder() currentTime
.initTime(currentTime) );
.countDownLatch(countDownLatch)
.replayOMList(replayOMList) CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
.replayMatchKey(
OMReplayContend.generateMatchKey(
CurrentAppOctopusMessageType,
currentTime
)
)
.type(CurrentAppOctopusMessageType)
.build();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResult.waitFor(omReplayContend); asyncWaitOMResult.waitFor(omReplayContend);
@@ -101,21 +91,24 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
countDownLatch countDownLatch
);*/ );*/
boolean isAllHealthyAgentReport = false;
try { try {
// 超时等待5秒钟, 或者所有的Agent均已经完成上报 // 超时等待5秒钟, 或者所有的Agent均已经完成上报
countDownLatch.await( isAllHealthyAgentReport = countDownLatch.await(
5, 5,
TimeUnit.SECONDS TimeUnit.SECONDS
); );
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("存在部分Agent没有上报 版本信息!");
} finally { } finally {
// 超时,或者 全部信息已经收集 // 超时,或者 全部信息已经收集
if (!isAllHealthyAgentReport) {
log.warn("存在部分Agent没有上报 版本信息!");
}
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
REPLAY_CACHE_MAP.remove( asyncWaitOMResult.stopWaiting(omReplayContend);
omReplayContend.getReplayMatchKey()
);
// 处理结果 // 处理结果
omReplayContend omReplayContend
@@ -132,7 +125,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// help gc // help gc
omReplayContend = null; omReplayContend = null;
} }
return result; return result;
@@ -165,21 +157,14 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
currentTime currentTime
); );
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); // 构造结果
ArrayList<OctopusMessage> replayOMList = new ArrayList<>(); OMReplayContend omReplayContend = OMReplayContend.build(
OMReplayContend omReplayContend = OMReplayContend ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
.builder() CurrentAppOctopusMessageType,
.initTime(currentTime) currentTime
.countDownLatch(countDownLatch) );
.replayOMList(replayOMList)
.replayMatchKey( CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
OMReplayContend.generateMatchKey(
CurrentAppOctopusMessageType,
currentTime
)
)
.type(CurrentAppOctopusMessageType)
.build();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResult.waitFor(omReplayContend); asyncWaitOMResult.waitFor(omReplayContend);
@@ -202,9 +187,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 超时,或者 全部信息已经收集 // 超时,或者 全部信息已经收集
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
REPLAY_CACHE_MAP.remove( asyncWaitOMResult.stopWaiting(omReplayContend);
omReplayContend.getReplayMatchKey()
);
// 处理结果 // 处理结果
omReplayContend omReplayContend

View File

@@ -14,21 +14,21 @@ import java.util.Map;
@RestController @RestController
@RequestMapping("/octopus/server/agent") @RequestMapping("/octopus/server/agent")
@Api("处理Agent核心内容的Controller") @Api(value = "处理Agent核心内容的Controller", tags = "Agent")
public class AgentController { public class AgentController {
@Resource @Resource
OctopusAgentService octopusAgentService; OctopusAgentService octopusAgentService;
@GetMapping("/version") @GetMapping("/version")
@ApiOperation("[版本]-所有OctopusAgent") @ApiOperation("[版本] - 所有OctopusAgent")
public R<Map<String, String>> getAllAgentVersion(){ public R<Map<String, String>> getAllAgentVersion(){
return R.ok(octopusAgentService.getAllAgentVersion()); return R.ok(octopusAgentService.getAllAgentVersion());
} }
@GetMapping("/coreInfo") @GetMapping("/coreInfo")
@ApiOperation("[核心信息]-所有OctopusAgent") @ApiOperation("[核心信息] - 所有OctopusAgent")
public R<Map<String, ServerInfoVO>> getAllAgentCoreInfo(){ public R<Map<String, ServerInfoVO>> getAllAgentCoreInfo(){
return R.ok(octopusAgentService.getAllAgentCoreInfo()); return R.ok(octopusAgentService.getAllAgentCoreInfo());

View File

@@ -5,7 +5,8 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiParam;
import io.wdd.common.beans.response.R; import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.execute.service.SyncExecutionService;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
@@ -13,72 +14,169 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@RestController @RestController
@RequestMapping("/octopus/server/executor") @RequestMapping("/octopus/server/executor")
@Api("Agent执行命令的Controller") @Api(value = "Agent执行命令的Controller", tags = "Execution")
public class ExecutionController { public class ExecutionController {
@Resource @Resource
CoreExecutionService coreExecutionService; AsyncExecutionService asyncExecutionService;
@Resource @Resource
BuildStreamReader buildStreamReader; BuildStreamReader buildStreamReader;
@Resource
SyncExecutionService syncExecutionService;
@PostMapping("/command/one") @PostMapping("/command/one")
@ApiOperation("[命令]-手动发送命令") @ApiOperation("[命令] [异步]- 单台主机")
public R<String> patchCommandToAgent( public R<String> patchCommandToAgent(
@RequestParam(value = "topicName") String topicName, @RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName,
@RequestParam(value = "commandList", required = false) @Nullable List<String> commandList, @RequestParam(value = "commandList", required = false) @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @Nullable String type @RequestParam(value = "type", required = false) @Nullable String type
) { ) {
String streamKey = coreExecutionService String streamKey = asyncExecutionService
.SendCommandToAgent( .SendCommandToAgent(
topicName, topicName,
type, type,
commandList commandList,
completeCommandList,
false,
null,
false
); );
return R.ok(streamKey); return R.ok(streamKey);
} }
@PostMapping("/command/batch") @PostMapping("/command/batch")
@ApiOperation("[命令]- 批量发送命令") @ApiOperation("[命令] [异步] - 批量主机")
public R<List<String>> patchCommandToAgentList( public R<List<String>> patchCommandToAgentList(
@RequestParam(value = "topicNameList") @RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList, @ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList, @ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @Nullable String type @RequestParam(value = "type", required = false) @Nullable String type
) { ) {
return R.ok(coreExecutionService.SendCommandToAgent( return R.ok(asyncExecutionService.SendCommandToAgentComplete(
topicNameList, topicNameList,
type, type,
commandList commandList,
completeCommandList
)); ));
} }
@PostMapping("/command/all") @PostMapping("/command/all")
@ApiOperation("[命令]- 发送命令至所有的主机") @ApiOperation("[命令] [异步] - 所有的主机")
public R<List<String>> patchCommandToAgentAll( public R<List<String>> patchCommandToAllAgent(
@RequestParam(value = "commandList", required = false) @RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList, @ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @Nullable String type @RequestParam(value = "type", required = false) @Nullable String type
) { ) {
return R.ok(coreExecutionService.SendCommandToAgent( return R.ok(asyncExecutionService.SendCommandToAgentComplete(
ALL_AGENT_TOPIC_NAME_LIST, ALL_AGENT_TOPIC_NAME_LIST,
type, type,
commandList commandList,
completeCommandList
)); ));
} }
@PostMapping("/command/healthy")
@ApiOperation("[命令] [异步] - 健康的主机")
public R<List<String>> patchCommandToHealthyAgent(
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @Nullable String type
) {
return R.ok(asyncExecutionService.SendCommandToAgentComplete(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST,
type,
commandList,
completeCommandList
));
}
@PostMapping("/command/sync/one")
@ApiOperation("[命令] [同步] - 单机-等待命令结果")
public R<List<String>> SyncPatchCommandToAgent(
@RequestParam(value = "topicName") @ApiParam(name = "topicName", value = "目标主机名称") String topicName,
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type
) {
return R.ok(
syncExecutionService.SyncSendCommandToAgent(
topicName,
type,
commandList,
completeCommandList
)
);
}
@PostMapping("/command/sync/batch")
@ApiOperation("[命令] [同步] - 批量-等待命令结果")
public R<List<ArrayList<String>>> SyncPatchCommandToAgentBatch(
@RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList,
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type
) {
return R.ok(
syncExecutionService.SyncSendCommandToAgentComplete(
topicNameList,
type,
commandList,
completeCommandList
)
);
}
@PostMapping("/command/sync/all")
@ApiOperation("[命令] [同步] - 全部-同步等待命令结果")
public R<List<ArrayList<String>>> SyncPatchCommandToAgentAll(
@RequestParam(value = "commandList", required = false)
@ApiParam(name = "commandList", value = "命令行") @Nullable List<String> commandList,
@RequestParam(value = "completeCommandList", required = false)
@ApiParam(name = "completeCommandList", value = "完整命令行,优先,可为空") @Nullable List<List<String>> completeCommandList,
@RequestParam(value = "type", required = false) @ApiParam(name = "type", value = "执行命令类型") @Nullable String type
) {
return R.ok(
syncExecutionService.SyncSendCommandToAgentComplete(
ALL_AGENT_TOPIC_NAME_LIST,
type,
commandList,
completeCommandList
)
);
}
@PostMapping("/agentStatusStream") @PostMapping("/agentStatusStream")
@ApiOperation("切换Console查看Agent状态日志") @ApiOperation("切换Console查看Agent状态日志")
@@ -105,7 +203,7 @@ public class ExecutionController {
) { ) {
return R.ok( return R.ok(
coreExecutionService asyncExecutionService
.SendCommandToAgent( .SendCommandToAgent(
topicNameList, topicNameList,
"AgentUpdate", "AgentUpdate",
@@ -121,7 +219,7 @@ public class ExecutionController {
) { ) {
return R.ok( return R.ok(
coreExecutionService asyncExecutionService
.SendCommandToAgent( .SendCommandToAgent(
topicNameList, topicNameList,
"AgentReboot", "AgentReboot",
@@ -137,7 +235,7 @@ public class ExecutionController {
) { ) {
return R.ok( return R.ok(
coreExecutionService asyncExecutionService
.SendCommandToAgent( .SendCommandToAgent(
topicNameList, topicNameList,
"AgentShutdown", "AgentShutdown",
@@ -145,5 +243,21 @@ public class ExecutionController {
)); ));
} }
@PostMapping("/function/bootUp")
@ApiOperation("重新部署")
public R<List<String>> AgentBootUp(
@RequestParam(value = "topicNameList")
@ApiParam(name = "topicNameList", value = "目标机器列表") List<String> topicNameList
) {
return R.ok(
asyncExecutionService
.SendCommandToAgent(
topicNameList,
"AgentBootUp",
null
));
}
} }

View File

@@ -17,7 +17,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@RestController @RestController
@Api(value = "定时任务控制中心的Controller") @Api(value = "定时任务控制中心的Controller", tags = "Scheduler")
@RequestMapping(value = "/octopus/server/scheduler") @RequestMapping(value = "/octopus/server/scheduler")
public class SchedulerController { public class SchedulerController {

View File

@@ -18,7 +18,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.*;
@RestController @RestController
@Api("Agent运行状态Controller") @Api(value = "Agent运行状态Controller", tags = "Status")
@RequestMapping("/octopus/server/status") @RequestMapping("/octopus/server/status")
public class StatusController { public class StatusController {

View File

@@ -1,19 +1,19 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
public interface CoreExecutionService { public interface AsyncExecutionService {
String SendCommandToAgent(String agentTopicName, String command); String SendCommandToAgent(String agentTopicName, String command);
String SendCommandToAgent(String agentTopicName, List<String> commandList); String SendCommandToAgent(String agentTopicName, List<String> commandList);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList); String SendCommandToAgent(String agentTopicName, String type, List<String> commandList);
List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command); List<String> SendCommandToAgent(List<String> agentTopicNameList, String type, List<String> command);
/** /**
@@ -37,10 +37,13 @@ public interface CoreExecutionService {
); );
/** ------------------------------------------------- */ /**
* -------------------------------------------------
*/
String SendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete);
String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete); List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete);
/** /**
* 通常为 页面定时脚本任务调用 * 通常为 页面定时脚本任务调用
@@ -89,4 +92,26 @@ public interface CoreExecutionService {
); );
/**
* 同步命令调用的方法
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
OctopusMessage AsyncCallSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
} }

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -22,7 +23,7 @@ import static io.wdd.rpc.init.ServerCacheAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
@Service @Service
@Slf4j @Slf4j
public class CoreExecutionServiceImpl implements CoreExecutionService { public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final String MANUAL_COMMAND_TYPE = "manual-command"; private static final String MANUAL_COMMAND_TYPE = "manual-command";
@Resource @Resource
@@ -52,11 +53,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
// 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
return SendCommandToAgent( return SendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
@@ -83,17 +79,37 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
} }
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) { public String SendCommandToAgentComplete(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SendCommandToAgent( return this.SendCommandToAgent(
agentTopicName, agentTopicName,
type, type,
commandList, commandList,
commandListComplete, commandListComplete,
null false,
null,
false
); );
} }
@Override
public List<String> SendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> commandListComplete) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
false,
null,
false
)
)
.collect(Collectors.toList());
}
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
@@ -112,6 +128,29 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
String resultKey = futureKey;
// 判定是否是 FutureKey
if (null == futureKey) {
resultKey = ExecutionMessage.GetResultKey(agentTopicName);
}
// 调用最底层的方法
this.AsyncCallSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
return resultKey;
}
@Override
public OctopusMessage AsyncCallSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 检查agentTopicName是否存在 // 检查agentTopicName是否存在
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error( log.error(
@@ -122,7 +161,11 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
//throw new MyRuntimeException("agentTopicName异常!" + agentTopicName); //throw new MyRuntimeException("agentTopicName异常!" + agentTopicName);
} }
// 归一化type类型 不行 // 归一化type
if (StringUtils.isEmpty(type)) {
type = MANUAL_COMMAND_TYPE;
}
String resultKey = futureKey; String resultKey = futureKey;
// 判定是否是 FutureKey // 判定是否是 FutureKey
if (null == futureKey) { if (null == futureKey) {
@@ -178,10 +221,8 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// help gc // help gc
executionMessage = null; executionMessage = null;
octopusMessage = null;
return resultKey;
return octopusMessage;
} }
private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) { private OctopusMessage generateOctopusMessage(String agentTopicName, ExecutionMessage executionMessage) {
@@ -191,7 +232,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
return OctopusMessage return OctopusMessage
.builder() .builder()
.type(OctopusMessageType.EXECUTOR) .type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now()) .init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName) .uuid(agentTopicName)
.content( .content(
objectMapper.writeValueAsString(executionMessage) objectMapper.writeValueAsString(executionMessage)
@@ -244,7 +285,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
return agentTopicNameList return agentTopicNameList
.stream() .stream()
.map( .map(
agentTopicName -> this.SendCommandToAgent( agentTopicName -> this.SendCommandToAgentComplete(
agentTopicName, agentTopicName,
type, type,
null, null,
@@ -266,7 +307,10 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
type, type,
null, null,
completeCommandList, completeCommandList,
atnFutureKey.get(agentTopicName) atnFutureKey.getOrDefault(
agentTopicName,
null
)
) )
) )
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@@ -0,0 +1,97 @@
package io.wdd.rpc.execute.service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface SyncExecutionService {
/**
* ------------------------ Sync Command Executor ------------------------------
*/
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList);
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList);
List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList);
/**
* 调用 单行命令脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param needResultReplay
* @param futureKey
* @param durationTask
* @return
*/
ArrayList<String> SyncSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* -------------------------------------------------
*/
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList);
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList);
/**
* 通常为 页面定时脚本任务调用
*
* @param agentTopicNameList 目标Agent的TopicName列表
* @param type 任务类型
* @param completeCommandList 完整的类型
* @param atnFutureKey 由于脚本任务为延迟调用,故需要提前生成未来的ResultKey
* @return 每个Agent只返回一个 ResultKeyScript脚本的结果全部拼接到一起全部的resultKey
*/
List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey);
ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey);
/**
* 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param commandList
* @param commandListComplete
* @param futureKey
* @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key
*/
ArrayList<String> SyncSendCommandToAgent(
String agentTopicName,
String type,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
}

View File

@@ -0,0 +1,254 @@
package io.wdd.rpc.execute.service;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.message.handler.AsyncWaitOMResult;
import io.wdd.rpc.message.handler.OMReplayContend;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
@Slf4j
public class SyncExecutionServiceImpl implements SyncExecutionService {
private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource
AsyncWaitOMResult asyncWaitOMResult;
@Resource
AsyncExecutionService asyncExecutionService;
/**
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, List<String> commandList) {
return this.SyncSendCommandToAgent(
agentTopicName,
null,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgent(List<String> agentTopicNameList, String type, List<String> commandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, boolean needResultReplay, String futureKey, boolean durationTask) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
null,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> completeCommandList) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
);
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> commandList, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
null,
false
)
)
.collect(Collectors.toList());
}
@Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<List<String>> completeCommandList, HashMap<String, String> atnFutureKey) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> this.SyncSendCommandToAgent(
agentTopicName,
type,
null,
completeCommandList,
COMMAND_EXEC_NEED_REPLAY,
atnFutureKey.get(agentTopicName),
false
)
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, String futureKey) {
return this.SyncSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
COMMAND_EXEC_NEED_REPLAY,
futureKey,
false
);
}
@Override
public ArrayList<String> SyncSendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
LocalDateTime initTime = octopusMessage.getInit_time();
ArrayList<String> result = new ArrayList<>();
// 构造消息等待对象
int commandCount = 1;
if (null != commandListComplete) {
commandCount = Math.max(
commandListComplete.size(),
1
);
}
OMReplayContend omReplayContend = OMReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
);
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch();
// 开始等待结果
asyncWaitOMResult.waitFor(omReplayContend);
// 监听结果
try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOMResult.stopWaiting(omReplayContend);
// 解析结果
omReplayContend
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
)
.forEachOrdered(
singleResult -> result.addAll(singleResult)
);
}
// 返回
return result;
}
}

View File

@@ -12,6 +12,12 @@ import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
/**
* 从Agent收集返回信息的统一处理地点
* 使用方法: 业务类构造 OMReplayContend对象调用AsyncWaitOMResult.waitFor()方法
* <p>
* 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容
*/
@Service @Service
@Slf4j @Slf4j
public class AsyncWaitOMResult { public class AsyncWaitOMResult {
@@ -21,18 +27,27 @@ public class AsyncWaitOMResult {
* KEY -> replayMatchKey * KEY -> replayMatchKey
* VALUE -> OMReplayContend - 包含countDownLatch 和 result * VALUE -> OMReplayContend - 包含countDownLatch 和 result
*/ */
public static final HashMap<String, OMReplayContend> REPLAY_CACHE_MAP = new HashMap<>(); private static final HashMap<String, OMReplayContend> REPLAY_WAITING_TARGET = new HashMap<>();
public void waitFor(OMReplayContend omReplayContend) { public void waitFor(OMReplayContend omReplayContend) {
// 向 REPLAY_CACHE_MAP中写入 Key // 向 REPLAY_CACHE_MAP中写入 Key
REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(), REPLAY_WAITING_TARGET.put(
omReplayContend); omReplayContend.getReplayMatchKey(),
omReplayContend
);
// 在调用线程的countDownLunch结束之后,关闭 // 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列 // 清除 REPLAY_CACHE_MAP 中的队列
} }
public void stopWaiting(OMReplayContend omReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
REPLAY_WAITING_TARGET.remove(omReplayContend.getReplayMatchKey());
}
@PostConstruct @PostConstruct
public void daemonHandleReplayOMFromAgent() { public void daemonHandleReplayOMFromAgent() {
@@ -71,7 +86,7 @@ public class AsyncWaitOMResult {
replayOMessage.getType(), replayOMessage.getType(),
replayOMessage.getInit_time() replayOMessage.getInit_time()
); );
if (!REPLAY_CACHE_MAP.containsKey(matchKey)) { if (!REPLAY_WAITING_TARGET.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处 // todo 错误的数据需要放置于某处
@@ -80,7 +95,7 @@ public class AsyncWaitOMResult {
} }
// Map中包含有Key,那么放置进去 // Map中包含有Key,那么放置进去
OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey); OMReplayContend replayContend = REPLAY_WAITING_TARGET.get(matchKey);
replayContend replayContend
.getReplayOMList() .getReplayOMList()
.add(replayOMessage); .add(replayOMessage);

View File

@@ -11,7 +11,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@Data @Data
@@ -35,8 +35,7 @@ public class OMReplayContend {
CountDownLatch countDownLatch; CountDownLatch countDownLatch;
@ApiModelProperty("回复的结果列表, 临时保存") @ApiModelProperty("回复的结果列表, 临时保存")
List<OctopusMessage> replayOMList; ArrayList<OctopusMessage> replayOMList;
protected static String generateMatchKey(OMReplayContend replayIdentifier) { protected static String generateMatchKey(OMReplayContend replayIdentifier) {
@@ -49,6 +48,11 @@ public class OMReplayContend {
return relayMatchKey; return relayMatchKey;
} }
/**
* @param messageType
* @param messageInitTime 必须使用 TimeUtils.currentFormatTime();
* @return
*/
public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) {
String relayMatchKey = messageType.toString() + messageInitTime.toString(); String relayMatchKey = messageType.toString() + messageInitTime.toString();
@@ -56,4 +60,23 @@ public class OMReplayContend {
return relayMatchKey; return relayMatchKey;
} }
/**
* 方便使用的一个构造方法
*
* @return
*/
public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
return new OMReplayContend(
currentOMType,
currentTime,
generateMatchKey(
currentOMType,
currentTime
),
new CountDownLatch(waitForReplayNum),
new ArrayList<>()
);
}
} }

View File

@@ -14,7 +14,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.concurrent.ArrayBlockingQueue;
@Configuration @Configuration
@Slf4j(topic = "Octopus Message Handler") @Slf4j(topic = "Octopus Message Handler")
@@ -60,7 +59,6 @@ public class OMessageHandlerServer {
octopusMessage octopusMessage
); );
// 获取Agent的版本信息 // 获取Agent的版本信息
if (octopusMessage if (octopusMessage
.getUuid() .getUuid()

View File

@@ -1,7 +1,7 @@
package io.wdd.rpc.scheduler.service.script; package io.wdd.rpc.scheduler.service.script;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO; import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils; import io.wdd.rpc.scheduler.config.QuartzSchedulerUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -20,7 +20,7 @@ import java.util.List;
public class AgentApplyScheduledScript { public class AgentApplyScheduledScript {
@Resource @Resource
CoreExecutionService coreExecutionService; AsyncExecutionService asyncExecutionService;
@Resource @Resource
QuartzSchedulerUtils quartzSchedulerUtils; QuartzSchedulerUtils quartzSchedulerUtils;
@@ -45,7 +45,7 @@ public class AgentApplyScheduledScript {
} }
// 发送命令到Agent中 // 发送命令到Agent中
List<String> resultKeyList = coreExecutionService List<String> resultKeyList = asyncExecutionService
.SendCommandToAgentComplete( .SendCommandToAgentComplete(
targetMachineList, targetMachineList,
scriptType, scriptType,

View File

@@ -1,6 +1,6 @@
package io.wdd.server; package io.wdd.server;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.AsyncExecutionService;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
@@ -13,7 +13,7 @@ class ServerApplicationTests {
@Resource @Resource
CoreExecutionService coreExecutionService; AsyncExecutionService asyncExecutionService;
@Test @Test
void testCoreExecutionCompleteScript() { void testCoreExecutionCompleteScript() {
@@ -61,7 +61,7 @@ class ServerApplicationTests {
) )
); );
List<String> resultList = coreExecutionService.SendCommandToAgentComplete( List<String> resultList = asyncExecutionService.SendCommandToAgentComplete(
targetMachineList, targetMachineList,
"Scheduled Script", "Scheduled Script",
completeScript completeScript