[ Agent ] [ Executor ] - fix bugs
This commit is contained in:
@@ -46,7 +46,7 @@ func INIT(agentServerInfoConf string) chan bool {
|
|||||||
initToServerQueue := &rabbitmq.RabbitQueue{
|
initToServerQueue := &rabbitmq.RabbitQueue{
|
||||||
RabbitProp: initToServerProp,
|
RabbitProp: initToServerProp,
|
||||||
}
|
}
|
||||||
defer initToServerQueue.Close()
|
//defer initToServerQueue.Close()
|
||||||
|
|
||||||
// 建立连接
|
// 建立连接
|
||||||
initToServerQueue.Connect()
|
initToServerQueue.Connect()
|
||||||
|
|||||||
@@ -106,7 +106,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.baomidou</groupId>
|
<groupId>com.baomidou</groupId>
|
||||||
<artifactId>mybatis-plus-boot-starter</artifactId>
|
<artifactId>mybatis-plus-boot-starter</artifactId>
|
||||||
<version>3.5.2</version>
|
<version>3.5.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import io.wdd.func.auto.beans.BaseFunctionEnum;
|
|||||||
import io.wdd.func.auto.beans.ProjectDeployContext;
|
import io.wdd.func.auto.beans.ProjectDeployContext;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -47,21 +48,24 @@ public class BaseFuncScheduler {
|
|||||||
.getMasterNode()
|
.getMasterNode()
|
||||||
.getTopicName();
|
.getTopicName();
|
||||||
|
|
||||||
|
if (!MasterNodeProcedure(masterTopicName)) {
|
||||||
|
log.error("初始化过程错误! 即将终止!");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (!CollectionUtils.isEmpty(projectDeployContext
|
||||||
|
.getAgentNodeList())) {
|
||||||
|
|
||||||
List<String> agentTopicNameList = projectDeployContext
|
List<String> agentTopicNameList = projectDeployContext
|
||||||
.getAgentNodeList()
|
.getAgentNodeList()
|
||||||
.stream()
|
.stream()
|
||||||
.map(agentNode -> agentNode.getTopicName())
|
.map(agentNode -> agentNode.getTopicName())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
||||||
if (!MasterNodeProcedure(masterTopicName)) {
|
|
||||||
log.error("初始化过程错误! 即将终止!");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!AgentNodeProcedure(agentTopicNameList)) {
|
if (!AgentNodeProcedure(agentTopicNameList)) {
|
||||||
log.error("初始化过程错误! 即将终止!");
|
log.error("初始化过程错误! 即将终止!");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.info("初始化过程完成!");
|
log.info("初始化过程完成!");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,6 +85,9 @@ public class FuncServiceImpl implements FuncService {
|
|||||||
private List<String> syncCallFunction(String agentTopicName, ExecutionMessageType emType, String funcName, List<String> funcArgs) {
|
private List<String> syncCallFunction(String agentTopicName, ExecutionMessageType emType, String funcName, List<String> funcArgs) {
|
||||||
|
|
||||||
// 重新构造内容,增加Function Name
|
// 重新构造内容,增加Function Name
|
||||||
|
if (CollectionUtils.isEmpty(funcArgs)) {
|
||||||
|
funcArgs = new ArrayList<>();
|
||||||
|
}
|
||||||
funcArgs.add(
|
funcArgs.add(
|
||||||
0,
|
0,
|
||||||
funcName
|
funcName
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ import io.wdd.func.xray.persisit.XrayOssOperator;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.beanutils.BeanUtils;
|
import org.apache.commons.beanutils.BeanUtils;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
@@ -61,7 +60,6 @@ public class XrayCoreServiceImpl implements XrayCoreService {
|
|||||||
* @param allNetworkPathList
|
* @param allNetworkPathList
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
@NotNull
|
|
||||||
private static Set<ProxyNode> getAllProxyNodeSet(List<List<ProxyNode>> allNetworkPathList, boolean includeSubscribeNode) {
|
private static Set<ProxyNode> getAllProxyNodeSet(List<List<ProxyNode>> allNetworkPathList, boolean includeSubscribeNode) {
|
||||||
// 拿到所有的proxyNode
|
// 拿到所有的proxyNode
|
||||||
Set<ProxyNode> proxyNodeSet = allNetworkPathList
|
Set<ProxyNode> proxyNodeSet = allNetworkPathList
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -49,6 +50,7 @@ public class ExecutionServiceImpl implements ExecutionService {
|
|||||||
ExecutionMessage executionMessage = this
|
ExecutionMessage executionMessage = this
|
||||||
.generateExecutionMessage(
|
.generateExecutionMessage(
|
||||||
executionType,
|
executionType,
|
||||||
|
funcContent,
|
||||||
pipeLineCommand,
|
pipeLineCommand,
|
||||||
commandList,
|
commandList,
|
||||||
commandListComplete,
|
commandListComplete,
|
||||||
@@ -89,7 +91,7 @@ public class ExecutionServiceImpl implements ExecutionService {
|
|||||||
|
|
||||||
// 2023年10月8日 根据执行的命令数量设定超时等待时间
|
// 2023年10月8日 根据执行的命令数量设定超时等待时间
|
||||||
int commandMaxWaitTimeout = COMMAND_MAX_WAIT_TIMEOUT;
|
int commandMaxWaitTimeout = COMMAND_MAX_WAIT_TIMEOUT;
|
||||||
if (commandListComplete.size() > 0) {
|
if (!CollectionUtils.isEmpty(commandListComplete)) {
|
||||||
commandMaxWaitTimeout = COMMAND_MAX_WAIT_TIMEOUT * commandListComplete.size();
|
commandMaxWaitTimeout = COMMAND_MAX_WAIT_TIMEOUT * commandListComplete.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,11 +139,12 @@ public class ExecutionServiceImpl implements ExecutionService {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExecutionMessage generateExecutionMessage(String executionType, List<List<String>> pipeLineCommand, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
|
private ExecutionMessage generateExecutionMessage(String executionType, List<String> funcContent, List<List<String>> pipeLineCommand, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, boolean durationTask) {
|
||||||
|
|
||||||
return ExecutionMessage
|
return ExecutionMessage
|
||||||
.builder()
|
.builder()
|
||||||
.executionType(executionType)
|
.executionType(executionType)
|
||||||
|
.funcContent(funcContent)
|
||||||
.singleLineCommand(commandList)
|
.singleLineCommand(commandList)
|
||||||
.multiLineCommand(commandListComplete)
|
.multiLineCommand(commandListComplete)
|
||||||
.pipeLineCommand(pipeLineCommand)
|
.pipeLineCommand(pipeLineCommand)
|
||||||
|
|||||||
@@ -55,7 +55,9 @@ public class AcceptAgentInitInfo {
|
|||||||
"LosAngeles",
|
"LosAngeles",
|
||||||
7,
|
7,
|
||||||
"Beijing",
|
"Beijing",
|
||||||
8
|
8,
|
||||||
|
"Paripark",
|
||||||
|
9
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
public static Set<String> ALL_SERVER_ARCH_INFO = new HashSet<>(
|
public static Set<String> ALL_SERVER_ARCH_INFO = new HashSet<>(
|
||||||
@@ -288,7 +290,7 @@ public class AcceptAgentInitInfo {
|
|||||||
String[] split = serverName.split("-");
|
String[] split = serverName.split("-");
|
||||||
if (split.length <= 2 || !ALL_SERVER_CITY_INDEX.containsKey(split[0]) || !ALL_SERVER_ARCH_INFO.contains(split[1])) {
|
if (split.length <= 2 || !ALL_SERVER_CITY_INDEX.containsKey(split[0]) || !ALL_SERVER_ARCH_INFO.contains(split[1])) {
|
||||||
log.error(
|
log.error(
|
||||||
"server info from agent are {}",
|
"city are not validated from agent are {}",
|
||||||
serverInfoVO
|
serverInfoVO
|
||||||
);
|
);
|
||||||
throw new MyRuntimeException("server name not validated !");
|
throw new MyRuntimeException("server name not validated !");
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ public class ServerInfoPO implements Serializable {
|
|||||||
/**
|
/**
|
||||||
* server primary key
|
* server primary key
|
||||||
*/
|
*/
|
||||||
@TableId(value = "server_id", type = IdType.AUTO)
|
@TableId(value = "server_id", type = IdType.ASSIGN_ID)
|
||||||
private Long serverId;
|
private Long serverId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -190,7 +190,7 @@ public class CoreServerServiceImpl implements CoreServerService {
|
|||||||
)
|
)
|
||||||
.one();
|
.one();
|
||||||
|
|
||||||
if (po == null) {
|
if (po != null) {
|
||||||
// 已经存在,需要把更新的信息 复制到旧的内容之上
|
// 已经存在,需要把更新的信息 复制到旧的内容之上
|
||||||
copyProperties(
|
copyProperties(
|
||||||
serverInfoVO,
|
serverInfoVO,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import io.wdd.func.auto.service.BaseFuncScheduler;
|
|||||||
import io.wdd.server.beans.po.ServerInfoPO;
|
import io.wdd.server.beans.po.ServerInfoPO;
|
||||||
import io.wdd.server.beans.request.ServerQueryEntity;
|
import io.wdd.server.beans.request.ServerQueryEntity;
|
||||||
import io.wdd.server.coreService.CoreServerService;
|
import io.wdd.server.coreService.CoreServerService;
|
||||||
import org.junit.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
@@ -13,14 +13,12 @@ import javax.annotation.Resource;
|
|||||||
@SpringBootTest
|
@SpringBootTest
|
||||||
public class TestBaseFuncScheduler {
|
public class TestBaseFuncScheduler {
|
||||||
|
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
BaseFuncScheduler baseFuncScheduler;
|
BaseFuncScheduler baseFuncScheduler;
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
CoreServerService serverService;
|
CoreServerService serverService;
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRunProcedure() {
|
public void testRunProcedure() {
|
||||||
|
|
||||||
@@ -29,12 +27,14 @@ public class TestBaseFuncScheduler {
|
|||||||
projectDeployContext.setProjectId(1716372290994155522L);
|
projectDeployContext.setProjectId(1716372290994155522L);
|
||||||
|
|
||||||
ServerQueryEntity serverQueryEntity = new ServerQueryEntity();
|
ServerQueryEntity serverQueryEntity = new ServerQueryEntity();
|
||||||
serverQueryEntity.setServerName("Osaka");
|
serverQueryEntity.setServerName("Paripark");
|
||||||
ServerInfoPO serverInfoPO = serverService
|
ServerInfoPO serverInfoPO = serverService
|
||||||
.serverGetByPage(serverQueryEntity)
|
.serverGetByPage(serverQueryEntity)
|
||||||
.getRecords()
|
.getRecords()
|
||||||
.get(0);
|
.get(0);
|
||||||
|
|
||||||
|
System.out.println("serverInfoPO = " + serverInfoPO);
|
||||||
|
|
||||||
projectDeployContext.setMasterNode(serverInfoPO);
|
projectDeployContext.setMasterNode(serverInfoPO);
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user