[ server ] [ executor ] - optimize code

This commit is contained in:
zeaslity
2022-12-27 16:02:32 +08:00
parent 9bc516bdbd
commit 37752ca07c
16 changed files with 189 additions and 166 deletions

View File

@@ -1,5 +1,7 @@
package io.wdd.rpc.execute.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
@@ -18,6 +20,8 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource
ToAgentMessageSender messageSender;
@Resource
ObjectMapper objectMapper;
@Override
public String SendCommandToAgent(String topicName, String command) {
@@ -34,11 +38,22 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList);
ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent();
String executionMsg;
try {
executionMsg = objectMapper.writeValueAsString(executionMessage);
octopusMessage.setContent(executionMsg);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
messageSender.send(octopusMessage);
ExecutionMessage content = (ExecutionMessage) octopusMessage.getContent();
return content.getResultKey();
return executionMessage.getResultKey();
}
@@ -53,15 +68,16 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList){
ExecutionMessage executionMessage = generateExecutionMessage(
type,
commandList,
generateCommandResultKey(topicName)
);
return OctopusMessage.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(generateExecutionMessage(
type,
commandList,
generateCommandResultKey(topicName)
))
.content(executionMessage)
.uuid(topicName)
.build();
}

View File

@@ -104,6 +104,7 @@ public class AcceptAgentInitInfo {
if (!checkAgentAlreadyRegister(agentQueueTopic)) {
// 3. save the agent info into database
// backend fixed thread daemon to operate the database ensuring the operation is correct !
log.info("[AGENT INIT] - agent not exist ! start to register !");
if (!databaseOperator.saveInitOctopusAgentInfo(serverInfoVO)) {
throw new MyRuntimeException("database save agent info error !");
}

View File

@@ -40,8 +40,10 @@ public class OctopusMessageHandlerServer {
// todo what to do after received the result
// log info ?
// update the database
// handle the result
// collect all message from agent and log to somewhere
// 1. send some info to the specific topic name
// 2. judge from which agent the message are
//
}
}

View File

@@ -53,9 +53,13 @@ public class ToAgentMessageSender {
public void send(OctopusMessage octopusMessage) {
log.info("OctopusMessage {} send to agent {}",octopusMessage, octopusMessage.getUuid());
log.info("OctopusMessage {} send to agent {}", octopusMessage, octopusMessage.getUuid());
rabbitTemplate.convertAndSend(
initRabbitMQConfig.OCTOPUS_EXCHANGE,
octopusMessage.getUuid()+"*",
writeData(octopusMessage));
}

View File

@@ -10,6 +10,7 @@ import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import io.wdd.server.service.*;
import io.wdd.server.utils.EntityUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
@@ -24,6 +25,7 @@ import java.util.stream.Collectors;
@Service
@Slf4j
public class CoreServerServiceImpl implements CoreServerService {
@Resource

View File

@@ -2,11 +2,11 @@ spring:
application:
name: octopus-server
profiles:
active: k3s
active: local
cloud:
nacos:
config:
group: k3s
group: local
config-retry-time: 3000
file-extension: yaml
max-retry: 3
@@ -16,5 +16,5 @@ spring:
timeout: 5000
config-long-poll-timeout: 5000
extension-configs:
- group: k3s
data-id: common-k3s.yaml
- group: local
data-id: common-local.yaml