diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 1f4158c..b2e1648 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -8,6 +8,8 @@ import io.wdd.common.beans.agent.AgentOperationType; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.handler.AsyncWaitOMResult; +import io.wdd.rpc.message.handler.OMReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; @@ -17,6 +19,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,8 +31,9 @@ import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; -import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.AsyncWaitOMResult.REPLAY_CACHE_MAP; +import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; +import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; @Service @Slf4j @@ -44,6 +48,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { @Resource RedisTemplate redisTemplate; + @Resource + AsyncWaitOMResult asyncWaitOMResult; + @Override public Map getAllAgentVersion() { HashMap result = new HashMap<>(); @@ -55,24 +62,44 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 获取Agent的版本信息 -- 超时时间 - // 发送OctopusMessage-Agent + // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); - buildAndSendToAllAgent( + + // 发送OctopusMessage-Agent + buildOMessageAndSendToAllHealthyAgent( AgentOperationType.VERSION, currentTime ); + // 构造 异步结果监听内容 CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); + ArrayList replayOMList = new ArrayList<>(); + OMReplayContend omReplayContend = OMReplayContend + .builder() + .initTime(currentTime) + .countDownLatch(countDownLatch) + .replayOMList(replayOMList) + .replayMatchKey( + OMReplayContend.generateMatchKey( + CurrentAppOctopusMessageType, + currentTime + ) + ) + .type(CurrentAppOctopusMessageType) + .build(); - // todo 此处存在重大bug,会导致CPU占用飙升 - CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( + // 调用后台接收处理所有的Replay信息 + asyncWaitOMResult.waitFor(omReplayContend); + + //此处存在重大bug,会导致CPU占用飙升 + /*CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( result, currentTime, countDownLatch - ); + );*/ try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 @@ -83,8 +110,29 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { } catch (InterruptedException e) { log.warn("存在部分Agent没有上报 版本信息!"); } finally { - // 必须关闭释放线程 - getAllAgentVersionInfoFuture.cancel(true); + // 超时,或者 全部信息已经收集 + + // 此处调用,即可中断 异步任务的收集工作 + REPLAY_CACHE_MAP.remove( + omReplayContend.getReplayMatchKey() + ); + + // 处理结果 + omReplayContend + .getReplayOMList() + .stream() + .forEach( + mMessage -> { + result.put( + mMessage.getUuid(), + (String) mMessage.getResult() + ); + } + ); + + // help gc + omReplayContend = null; + } return result; @@ -107,24 +155,41 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { HashMap result = new HashMap<>(); // 获取Agent的版本信息 -- 超时时间 - // 发送OctopusMessage-Agent // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); - buildAndSendToAllAgent( + // 发送OctopusMessage-Agent + buildOMessageAndSendToAllHealthyAgent( AgentOperationType.INFO, currentTime ); CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); + ArrayList replayOMList = new ArrayList<>(); + OMReplayContend omReplayContend = OMReplayContend + .builder() + .initTime(currentTime) + .countDownLatch(countDownLatch) + .replayOMList(replayOMList) + .replayMatchKey( + OMReplayContend.generateMatchKey( + CurrentAppOctopusMessageType, + currentTime + ) + ) + .type(CurrentAppOctopusMessageType) + .build(); - CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( + // 调用后台接收处理所有的Replay信息 + asyncWaitOMResult.waitFor(omReplayContend); + + /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, currentTime, countDownLatch ); - +*/ try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 countDownLatch.await( @@ -134,13 +199,123 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { } catch (InterruptedException e) { log.warn("存在部分Agent没有上报 核心信息!"); } finally { - // 必须关闭释放线程 - getAllAgentCoreInfoFuture.cancel(true); + // 超时,或者 全部信息已经收集 + + // 此处调用,即可中断 异步任务的收集工作 + REPLAY_CACHE_MAP.remove( + omReplayContend.getReplayMatchKey() + ); + + // 处理结果 + omReplayContend + .getReplayOMList() + .stream() + .forEach( + mMessage -> { + + try { + + // 解析当前信息 + ServerInfoVO serverInfoVO = objectMapper.readValue( + (String) mMessage.getResult(), + new TypeReference() { + } + ); + + result.put( + mMessage.getUuid(), + serverInfoVO + ); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + ); + + // help gc + omReplayContend = null; } return result; } + + + @Override + public String shutdownAgentDanger(String agentTopicName) { + + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { + log.error("agentTopicName Error !"); + return "agentTopicName Error!"; + } + + LocalDateTime formatTime = TimeUtils.currentFormatTime(); + buildOMessageAndSendToAllHealthyAgent( + AgentOperationType.SHUTDOWN, + formatTime + ); + + // 是否需要检查相应的状态 + + + return null; + } + + private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { + + ExecutorService pool = ServerCommonPool.pool; + + // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 + return CompletableFuture.runAsync( + () -> { + while (true) { + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + // 开始收集等待 200ms + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 获取到OM + // 判断信息是否是需要的类型 + // 根据 init_time + OctopusMessageType + AgentOperationMessage + if (!judgyIsCurrentServerReplyMessage( + message, + startTime + )) { + + // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 + try { + OCTOPUS_MESSAGE_FROM_AGENT.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + // 是当前需要的消息信息 + result.put( + message.getUuid(), + (String) message.getResult() + ); + countDownLatch.countDown(); + } + }, + pool + ); + + + } + private CompletableFuture waitCollectAllAgentCoreInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { ExecutorService pool = ServerCommonPool.pool; @@ -207,79 +382,6 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { } - @Override - public String shutdownAgentDanger(String agentTopicName) { - - if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { - log.error("agentTopicName Error !"); - return "agentTopicName Error!"; - } - - LocalDateTime formatTime = TimeUtils.currentFormatTime(); - buildAndSendToAllAgent( - AgentOperationType.SHUTDOWN, - formatTime - ); - - // 是否需要检查相应的状态 - - - return null; - } - - private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { - - ExecutorService pool = ServerCommonPool.pool; - - // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 - return CompletableFuture.runAsync( - () -> { - while (true) { - if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { - // 开始收集等待 200ms - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // 返回,继续死循环 - continue; - } - - OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); - - // 获取到OM - // 判断信息是否是需要的类型 - // 根据 init_time + OctopusMessageType + AgentOperationMessage - if (!judgyIsCurrentServerReplyMessage( - message, - startTime - )) { - - // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 - try { - OCTOPUS_MESSAGE_FROM_AGENT.put(message); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // 返回,继续死循环 - continue; - } - - // 是当前需要的消息信息 - result.put( - message.getUuid(), - (String) message.getResult() - ); - countDownLatch.countDown(); - } - }, - pool - ); - - - } - /** * 判断信息是否是需要的类型 * 根据 init_time + OctopusMessageType + AgentOperationMessage @@ -299,7 +401,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { return startTimeEqual && OMTypeEqual; } - private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) { + private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) { List octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST .stream() diff --git a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java new file mode 100644 index 0000000..ceca3ec --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java @@ -0,0 +1,97 @@ +package io.wdd.rpc.message.handler; + +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.server.config.ServerCommonPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; + +@Service +@Slf4j +public class AsyncWaitOMResult { + + /** + * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 + * KEY -> replayMatchKey + * VALUE -> OMReplayContend - 包含countDownLatch 和 result + */ + public static final HashMap REPLAY_CACHE_MAP = new HashMap<>(); + + public void waitFor(OMReplayContend omReplayContend) { + + // 向 REPLAY_CACHE_MAP中写入 Key + REPLAY_CACHE_MAP.put(omReplayContend.getReplayMatchKey(), + omReplayContend); + + // 在调用线程的countDownLunch结束之后,关闭 + // 清除 REPLAY_CACHE_MAP 中的队列 + } + + @PostConstruct + public void daemonHandleReplayOMFromAgent() { + + // 异步任务启动 + CompletableFuture.runAsync( + () -> doHandleReplayOMFromAgent(), + ServerCommonPool.pool + ); + + } + + /** + * 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中 + */ + private void doHandleReplayOMFromAgent() { + + // 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT + while (true) { + + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + // 开始收集等待 50 ms + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + // 拿到消息 + OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 构造 replayMatchKey + String matchKey = OMReplayContend.generateMatchKey( + replayOMessage.getType(), + replayOMessage.getInit_time() + ); + if (!REPLAY_CACHE_MAP.containsKey(matchKey)) { + // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 + continue; + } + + // Map中包含有Key,那么放置进去 + OMReplayContend replayContend = REPLAY_CACHE_MAP.get(matchKey); + replayContend + .getReplayOMList() + .add(replayOMessage); + + // 需要操作countDown + replayContend + .getCountDownLatch() + .countDown(); + + // 结束操作,继续循环 + } + + + } + + +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java new file mode 100644 index 0000000..2f024e3 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java @@ -0,0 +1,59 @@ +package io.wdd.rpc.message.handler; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder(toBuilder = true) +@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") +public class OMReplayContend { + + @ApiModelProperty("rpc消息的类型") + OctopusMessageType type; + + @ApiModelProperty("rpc消息发送的时间, 精确匹配,去掉毫秒") + @JsonFormat(pattern = "yyyy-MM-dd hh:MM:ss") + LocalDateTime initTime; + + @ApiModelProperty("rpc消息-匹配唯一key") + String replayMatchKey; + + @ApiModelProperty("需要等待的消息个数") + CountDownLatch countDownLatch; + + @ApiModelProperty("回复的结果列表, 临时保存") + List replayOMList; + + + protected static String generateMatchKey(OMReplayContend replayIdentifier) { + + String relayMatchKey = replayIdentifier + .getType() + .toString() + replayIdentifier + .getInitTime() + .toString(); + + return relayMatchKey; + } + + public static String generateMatchKey(OctopusMessageType messageType, LocalDateTime messageInitTime) { + + String relayMatchKey = messageType.toString() + messageInitTime.toString(); + + return relayMatchKey; + } + +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java similarity index 98% rename from server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java rename to server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java index d37386d..73ed84d 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java @@ -17,7 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue; @Configuration @Slf4j(topic = "Octopus Message Handler") -public class OctopusMessageHandler { +public class OMessageHandlerServer { /** * Redis Key 用于保存Agent的最新版本