[ Service ] [ Executor ] 修改OctopusObjectMapper

This commit is contained in:
zeaslity
2023-08-11 15:47:54 +08:00
parent 06170c1024
commit 0a78f9a02b
15 changed files with 175 additions and 152 deletions

View File

@@ -12,6 +12,8 @@ import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.StopWaitingResult;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.WaitFromAgent;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
@Service
@@ -51,18 +53,26 @@ public class ExecutionServiceImpl implements ExecutionService {
// send the message
oMessageToAgentSender.send(octopusMessage);
System.out.println("originOctopusMessage = " + octopusMessage.hashCode());
// 需要返回结果
if (!durationTask) {
// 等待结果
WaitFromAgent(octopusMessage);
synchronized (octopusMessage) {
try {
octopusMessage.wait(10000);
log.debug("等待结束!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// 转换结果
commandResultLog = (ArrayList<String>) octopusMessage.getResult();
@@ -76,6 +86,9 @@ public class ExecutionServiceImpl implements ExecutionService {
}
// 释放等待队列
StopWaitingResult(octopusMessage);
return commandResultLog;
}
@@ -85,7 +98,7 @@ public class ExecutionServiceImpl implements ExecutionService {
return OctopusMessage
.builder()
.octopusMessageType(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentFormatTime())
.init_time(TimeUtils.currentTime())
.uuid(agentTopicName)
.content(
executionMessage

View File

@@ -27,7 +27,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
/**
* The type Accept boot up info message.

View File

@@ -9,6 +9,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
@@ -43,6 +44,42 @@ public class OMessageHandler {
}
public static void WaitFromAgent(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.put(
matchKey,
octopusMessage
);
//debug
log.info(
"wait from agent map is => {}",
FROM_AGENT_MATCH_TO_AGENT_MAP
);
}
public static void StopWaitingResult(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
if (FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) {
FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey);
}
}
/**
* 解析所有从Agent传回的消息中央集中化处理
*/
@@ -54,7 +91,7 @@ public class OMessageHandler {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
try {
OCTOPUS_MESSAGE_FROM_AGENT.wait(5000);
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -70,6 +107,7 @@ public class OMessageHandler {
replayOMessage.getOctopusMessageType(),
replayOMessage.getInit_time()
);
if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处
@@ -93,32 +131,4 @@ public class OMessageHandler {
}
}
protected void waitFor(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.put(
matchKey,
octopusMessage
);
}
public void stopWaiting(OctopusMessage octopusMessage) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey);
}
}

View File

@@ -15,7 +15,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
@Configuration
@Slf4j(topic = "Octopus Message Listener")
@@ -72,6 +72,8 @@ public class OMessageToServerListener {
octopusMessage
);
System.out.println("receivedOctopusMessage = " + octopusMessage.hashCode());
// 获取Agent的版本信息
if (octopusMessage
.getUuid()
@@ -96,9 +98,5 @@ public class OMessageToServerListener {
// 将收到的消息,直接存储到 缓存队列中
log.debug("cache the octopus message to inner cache list !");
OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage);
oMessageHandler.waitFor(octopusMessage);
// 唤醒等待线程
OCTOPUS_MESSAGE_FROM_AGENT.notify();
}
}

View File

@@ -2,7 +2,6 @@ package io.wdd.rpc.message.sender;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.init.InitRabbitMQConfig;
import io.wdd.rpc.message.OctopusMessage;
@@ -15,7 +14,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
/**
* adaptor
@@ -31,8 +30,6 @@ public class OMessageToAgentSender {
@Resource
InitRabbitMQConfig initRabbitMQConfig;
@Resource
ObjectMapper objectMapper;
/**
* send to Queue -- InitFromServer
@@ -106,7 +103,7 @@ public class OMessageToAgentSender {
@SneakyThrows
private byte[] writeData(Object data) {
return objectMapper.writeValueAsBytes(data);
return OctopusObjectMapper.writeValueAsBytes(data);
}
}

View File

@@ -12,7 +12,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Map;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
/**

View File

@@ -10,7 +10,7 @@ import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
@Data
@AllArgsConstructor

View File

@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.rpc.status.OctopusStatusMessage.*;
@Slf4j