[server] [rpc] - 修复rpc返回信息处理逻辑, 解决cpu过高的问题
This commit is contained in:
@@ -8,6 +8,8 @@ import io.wdd.common.beans.agent.AgentOperationType;
|
|||||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||||
import io.wdd.common.utils.TimeUtils;
|
import io.wdd.common.utils.TimeUtils;
|
||||||
|
import io.wdd.rpc.message.handler.AsyncWaitOMResult;
|
||||||
|
import io.wdd.rpc.message.handler.OMReplayContend;
|
||||||
import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
||||||
import io.wdd.server.beans.vo.ServerInfoVO;
|
import io.wdd.server.beans.vo.ServerInfoVO;
|
||||||
import io.wdd.server.config.ServerCommonPool;
|
import io.wdd.server.config.ServerCommonPool;
|
||||||
@@ -17,6 +19,7 @@ import org.springframework.stereotype.Service;
|
|||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -28,8 +31,9 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET;
|
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET;
|
||||||
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
||||||
import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION;
|
import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP;
|
||||||
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
|
import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION;
|
||||||
|
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -44,6 +48,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
@Resource
|
@Resource
|
||||||
RedisTemplate redisTemplate;
|
RedisTemplate redisTemplate;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
AsyncWaitOMResult asyncWaitOMResult;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, String> getAllAgentVersion() {
|
public Map<String, String> getAllAgentVersion() {
|
||||||
HashMap<String, String> result = new HashMap<>();
|
HashMap<String, String> result = new HashMap<>();
|
||||||
@@ -55,24 +62,44 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// 获取Agent的版本信息 -- 超时时间
|
// 获取Agent的版本信息 -- 超时时间
|
||||||
// 发送OctopusMessage-Agent
|
|
||||||
// 从OctopusToServer中收集到所有Agent的版本信息
|
// 从OctopusToServer中收集到所有Agent的版本信息
|
||||||
// 组装信息至集合中
|
// 组装信息至集合中
|
||||||
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
||||||
|
|
||||||
buildAndSendToAllAgent(
|
|
||||||
|
// 发送OctopusMessage-Agent
|
||||||
|
buildOMessageAndSendToAllHealthyAgent(
|
||||||
AgentOperationType.VERSION,
|
AgentOperationType.VERSION,
|
||||||
currentTime
|
currentTime
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// 构造 异步结果监听内容
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
||||||
|
ArrayList<OctopusMessage> replayOMList = new ArrayList<>();
|
||||||
|
OMReplayContend omReplayContend = OMReplayContend
|
||||||
|
.builder()
|
||||||
|
.initTime(currentTime)
|
||||||
|
.countDownLatch(countDownLatch)
|
||||||
|
.replayOMList(replayOMList)
|
||||||
|
.replayMatchKey(
|
||||||
|
OMReplayContend.generateMatchKey(
|
||||||
|
CurrentAppOctopusMessageType,
|
||||||
|
currentTime
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.type(CurrentAppOctopusMessageType)
|
||||||
|
.build();
|
||||||
|
|
||||||
// todo 此处存在重大bug,会导致CPU占用飙升
|
// 调用后台接收处理所有的Replay信息
|
||||||
CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo(
|
asyncWaitOMResult.waitFor(omReplayContend);
|
||||||
|
|
||||||
|
//此处存在重大bug,会导致CPU占用飙升
|
||||||
|
/*CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo(
|
||||||
result,
|
result,
|
||||||
currentTime,
|
currentTime,
|
||||||
countDownLatch
|
countDownLatch
|
||||||
);
|
);*/
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
||||||
@@ -83,8 +110,29 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.warn("存在部分Agent没有上报 版本信息!");
|
log.warn("存在部分Agent没有上报 版本信息!");
|
||||||
} finally {
|
} finally {
|
||||||
// 必须关闭释放线程
|
// 超时,或者 全部信息已经收集
|
||||||
getAllAgentVersionInfoFuture.cancel(true);
|
|
||||||
|
// 此处调用,即可中断 异步任务的收集工作
|
||||||
|
REPLAY_CACHE_MAP.remove(
|
||||||
|
omReplayContend.getReplayMatchKey()
|
||||||
|
);
|
||||||
|
|
||||||
|
// 处理结果
|
||||||
|
omReplayContend
|
||||||
|
.getReplayOMList()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
mMessage -> {
|
||||||
|
result.put(
|
||||||
|
mMessage.getUuid(),
|
||||||
|
(String) mMessage.getResult()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// help gc
|
||||||
|
omReplayContend = null;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
@@ -107,24 +155,41 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
HashMap<String, ServerInfoVO> result = new HashMap<>();
|
HashMap<String, ServerInfoVO> result = new HashMap<>();
|
||||||
|
|
||||||
// 获取Agent的版本信息 -- 超时时间
|
// 获取Agent的版本信息 -- 超时时间
|
||||||
// 发送OctopusMessage-Agent
|
|
||||||
// 从OctopusToServer中收集到所有Agent的版本信息
|
// 从OctopusToServer中收集到所有Agent的版本信息
|
||||||
// 组装信息至集合中
|
// 组装信息至集合中
|
||||||
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
||||||
|
|
||||||
buildAndSendToAllAgent(
|
// 发送OctopusMessage-Agent
|
||||||
|
buildOMessageAndSendToAllHealthyAgent(
|
||||||
AgentOperationType.INFO,
|
AgentOperationType.INFO,
|
||||||
currentTime
|
currentTime
|
||||||
);
|
);
|
||||||
|
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
||||||
|
ArrayList<OctopusMessage> replayOMList = new ArrayList<>();
|
||||||
|
OMReplayContend omReplayContend = OMReplayContend
|
||||||
|
.builder()
|
||||||
|
.initTime(currentTime)
|
||||||
|
.countDownLatch(countDownLatch)
|
||||||
|
.replayOMList(replayOMList)
|
||||||
|
.replayMatchKey(
|
||||||
|
OMReplayContend.generateMatchKey(
|
||||||
|
CurrentAppOctopusMessageType,
|
||||||
|
currentTime
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.type(CurrentAppOctopusMessageType)
|
||||||
|
.build();
|
||||||
|
|
||||||
CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
|
// 调用后台接收处理所有的Replay信息
|
||||||
|
asyncWaitOMResult.waitFor(omReplayContend);
|
||||||
|
|
||||||
|
/* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
|
||||||
result,
|
result,
|
||||||
currentTime,
|
currentTime,
|
||||||
countDownLatch
|
countDownLatch
|
||||||
);
|
);
|
||||||
|
*/
|
||||||
try {
|
try {
|
||||||
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
||||||
countDownLatch.await(
|
countDownLatch.await(
|
||||||
@@ -134,13 +199,123 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.warn("存在部分Agent没有上报 核心信息!");
|
log.warn("存在部分Agent没有上报 核心信息!");
|
||||||
} finally {
|
} finally {
|
||||||
// 必须关闭释放线程
|
// 超时,或者 全部信息已经收集
|
||||||
getAllAgentCoreInfoFuture.cancel(true);
|
|
||||||
|
// 此处调用,即可中断 异步任务的收集工作
|
||||||
|
REPLAY_CACHE_MAP.remove(
|
||||||
|
omReplayContend.getReplayMatchKey()
|
||||||
|
);
|
||||||
|
|
||||||
|
// 处理结果
|
||||||
|
omReplayContend
|
||||||
|
.getReplayOMList()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
mMessage -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
// 解析当前信息
|
||||||
|
ServerInfoVO serverInfoVO = objectMapper.readValue(
|
||||||
|
(String) mMessage.getResult(),
|
||||||
|
new TypeReference<ServerInfoVO>() {
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
result.put(
|
||||||
|
mMessage.getUuid(),
|
||||||
|
serverInfoVO
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// help gc
|
||||||
|
omReplayContend = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String shutdownAgentDanger(String agentTopicName) {
|
||||||
|
|
||||||
|
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
|
||||||
|
log.error("agentTopicName Error !");
|
||||||
|
return "agentTopicName Error!";
|
||||||
|
}
|
||||||
|
|
||||||
|
LocalDateTime formatTime = TimeUtils.currentFormatTime();
|
||||||
|
buildOMessageAndSendToAllHealthyAgent(
|
||||||
|
AgentOperationType.SHUTDOWN,
|
||||||
|
formatTime
|
||||||
|
);
|
||||||
|
|
||||||
|
// 是否需要检查相应的状态
|
||||||
|
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
|
||||||
|
|
||||||
|
ExecutorService pool = ServerCommonPool.pool;
|
||||||
|
|
||||||
|
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
|
||||||
|
return CompletableFuture.runAsync(
|
||||||
|
() -> {
|
||||||
|
while (true) {
|
||||||
|
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
|
||||||
|
// 开始收集等待 200ms
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// 返回,继续死循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
|
||||||
|
|
||||||
|
// 获取到OM
|
||||||
|
// 判断信息是否是需要的类型
|
||||||
|
// 根据 init_time + OctopusMessageType + AgentOperationMessage
|
||||||
|
if (!judgyIsCurrentServerReplyMessage(
|
||||||
|
message,
|
||||||
|
startTime
|
||||||
|
)) {
|
||||||
|
|
||||||
|
// 不是当前应用需要的的OM,将信息放置与Cache队列的末尾
|
||||||
|
try {
|
||||||
|
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// 返回,继续死循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 是当前需要的消息信息
|
||||||
|
result.put(
|
||||||
|
message.getUuid(),
|
||||||
|
(String) message.getResult()
|
||||||
|
);
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
pool
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private CompletableFuture<Void> waitCollectAllAgentCoreInfo(HashMap<String, ServerInfoVO> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
|
private CompletableFuture<Void> waitCollectAllAgentCoreInfo(HashMap<String, ServerInfoVO> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
|
||||||
|
|
||||||
ExecutorService pool = ServerCommonPool.pool;
|
ExecutorService pool = ServerCommonPool.pool;
|
||||||
@@ -207,79 +382,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String shutdownAgentDanger(String agentTopicName) {
|
|
||||||
|
|
||||||
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
|
|
||||||
log.error("agentTopicName Error !");
|
|
||||||
return "agentTopicName Error!";
|
|
||||||
}
|
|
||||||
|
|
||||||
LocalDateTime formatTime = TimeUtils.currentFormatTime();
|
|
||||||
buildAndSendToAllAgent(
|
|
||||||
AgentOperationType.SHUTDOWN,
|
|
||||||
formatTime
|
|
||||||
);
|
|
||||||
|
|
||||||
// 是否需要检查相应的状态
|
|
||||||
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<Void> waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
|
|
||||||
|
|
||||||
ExecutorService pool = ServerCommonPool.pool;
|
|
||||||
|
|
||||||
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
|
|
||||||
return CompletableFuture.runAsync(
|
|
||||||
() -> {
|
|
||||||
while (true) {
|
|
||||||
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
|
|
||||||
// 开始收集等待 200ms
|
|
||||||
try {
|
|
||||||
TimeUnit.MILLISECONDS.sleep(50);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
// 返回,继续死循环
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
|
|
||||||
|
|
||||||
// 获取到OM
|
|
||||||
// 判断信息是否是需要的类型
|
|
||||||
// 根据 init_time + OctopusMessageType + AgentOperationMessage
|
|
||||||
if (!judgyIsCurrentServerReplyMessage(
|
|
||||||
message,
|
|
||||||
startTime
|
|
||||||
)) {
|
|
||||||
|
|
||||||
// 不是当前应用需要的的OM,将信息放置与Cache队列的末尾
|
|
||||||
try {
|
|
||||||
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
// 返回,继续死循环
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 是当前需要的消息信息
|
|
||||||
result.put(
|
|
||||||
message.getUuid(),
|
|
||||||
(String) message.getResult()
|
|
||||||
);
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
pool
|
|
||||||
);
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 判断信息是否是需要的类型
|
* 判断信息是否是需要的类型
|
||||||
* 根据 init_time + OctopusMessageType + AgentOperationMessage
|
* 根据 init_time + OctopusMessageType + AgentOperationMessage
|
||||||
@@ -299,7 +401,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
return startTimeEqual && OMTypeEqual;
|
return startTimeEqual && OMTypeEqual;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) {
|
private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) {
|
||||||
|
|
||||||
List<OctopusMessage> octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
|
List<OctopusMessage> octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
|
||||||
.stream()
|
.stream()
|
||||||
|
|||||||
@@ -0,0 +1,97 @@
|
|||||||
|
package io.wdd.rpc.message.handler;
|
||||||
|
|
||||||
|
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||||
|
import io.wdd.server.config.ServerCommonPool;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class AsyncWaitOMResult {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
|
||||||
|
* KEY -> replayMatchKey
|
||||||
|
* VALUE -> OMReplayContend - 包含countDownLatch 和 result
|
||||||
|
*/
|
||||||
|
public static final HashMap<String, OMReplayContend> REPLAY_CACHE_MAP = new HashMap<>();
|
||||||
|
|
||||||
|
public void waitFor(OMReplayContend omReplayContend) {
|
||||||
|
|
||||||
|
// 向 REPLAY_CACHE_MAP中写入 Key
|
||||||
|
REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(),
|
||||||
|
omReplayContend);
|
||||||
|
|
||||||
|
// 在调用线程的countDownLunch结束之后,关闭
|
||||||
|
// 清除 REPLAY_CACHE_MAP 中的队列
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void daemonHandleReplayOMFromAgent() {
|
||||||
|
|
||||||
|
// 异步任务启动
|
||||||
|
CompletableFuture.runAsync(
|
||||||
|
() -> doHandleReplayOMFromAgent(),
|
||||||
|
ServerCommonPool.pool
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中
|
||||||
|
*/
|
||||||
|
private void doHandleReplayOMFromAgent() {
|
||||||
|
|
||||||
|
// 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
|
||||||
|
// 开始收集等待 50 ms
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// 返回,继续死循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 拿到消息
|
||||||
|
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
|
||||||
|
|
||||||
|
// 构造 replayMatchKey
|
||||||
|
String matchKey = OMReplayContend.generateMatchKey(
|
||||||
|
replayOMessage.getType(),
|
||||||
|
replayOMessage.getInit_time()
|
||||||
|
);
|
||||||
|
if (!REPLAY_CACHE_MAP.containsKey(matchKey)) {
|
||||||
|
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map中包含有Key,那么放置进去
|
||||||
|
OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey);
|
||||||
|
replayContend
|
||||||
|
.getReplayOMList()
|
||||||
|
.add(replayOMessage);
|
||||||
|
|
||||||
|
// 需要操作countDown
|
||||||
|
replayContend
|
||||||
|
.getCountDownLatch()
|
||||||
|
.countDown();
|
||||||
|
|
||||||
|
// 结束操作,继续循环
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,59 @@
|
|||||||
|
package io.wdd.rpc.message.handler;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||||
|
import io.swagger.annotations.ApiModel;
|
||||||
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||||
|
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.experimental.SuperBuilder;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
@SuperBuilder(toBuilder = true)
|
||||||
|
@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的")
|
||||||
|
public class OMReplayContend {
|
||||||
|
|
||||||
|
@ApiModelProperty("rpc消息的类型")
|
||||||
|
OctopusMessageType type;
|
||||||
|
|
||||||
|
@ApiModelProperty("rpc消息发送的时间, 精确匹配,去掉毫秒")
|
||||||
|
@JsonFormat(pattern = "yyyy-MM-dd hh:MM:ss")
|
||||||
|
LocalDateTime initTime;
|
||||||
|
|
||||||
|
@ApiModelProperty("rpc消息-匹配唯一key")
|
||||||
|
String replayMatchKey;
|
||||||
|
|
||||||
|
@ApiModelProperty("需要等待的消息个数")
|
||||||
|
CountDownLatch countDownLatch;
|
||||||
|
|
||||||
|
@ApiModelProperty("回复的结果列表, 临时保存")
|
||||||
|
List<OctopusMessage> replayOMList;
|
||||||
|
|
||||||
|
|
||||||
|
protected static String generateMatchKey(OMReplayContend replayIdentifier) {
|
||||||
|
|
||||||
|
String relayMatchKey = replayIdentifier
|
||||||
|
.getType()
|
||||||
|
.toString() + replayIdentifier
|
||||||
|
.getInitTime()
|
||||||
|
.toString();
|
||||||
|
|
||||||
|
return relayMatchKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) {
|
||||||
|
|
||||||
|
String relayMatchKey = messageType.toString() + messageInitTime.toString();
|
||||||
|
|
||||||
|
return relayMatchKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -17,7 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@Slf4j(topic = "Octopus Message Handler")
|
@Slf4j(topic = "Octopus Message Handler")
|
||||||
public class OctopusMessageHandler {
|
public class OMessageHandlerServer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Redis Key 用于保存Agent的最新版本
|
* Redis Key 用于保存Agent的最新版本
|
||||||
Reference in New Issue
Block a user