From 93638bf7c6982f87d42c768dbfc2f207f46a7e5f Mon Sep 17 00:00:00 2001 From: zeaslity Date: Wed, 8 Feb 2023 10:32:41 +0800 Subject: [PATCH] =?UTF-8?q?[=20server=20]=20[=20agent=20]-=20=20=E6=94=B6?= =?UTF-8?q?=E9=9B=86Agent=E7=9A=84=E7=89=88=E6=9C=AC=E4=BF=A1=E6=81=AF=20?= =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/handler/OMHandlerAgent.java | 6 +- .../rpc/agent/OctopusAgentServiceImpl.java | 122 ++++++++++++++++-- ...Server.java => OctopusMessageHandler.java} | 11 +- 3 files changed, 125 insertions(+), 14 deletions(-) rename server/src/main/java/io/wdd/rpc/message/handler/{OctopusMessageHandlerServer.java => OctopusMessageHandler.java} (74%) diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java index 733d7a6..7000904 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java @@ -57,9 +57,11 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler { // update agentRebootUpdateService.exAgentUpdate(operationMessage); } else if (opType.equals(AgentOperationType.VERSION)) { - agentOperationInfoService.AgentOpInfo(octopusMessage); - } else if (opType.equals(AgentOperationType.INFO)) { + // 收集Agent的版本信息 agentOperationInfoService.AgentOpVersion(octopusMessage); + } else if (opType.equals(AgentOperationType.INFO)) { + // 收集Agent的核心Info信息 + agentOperationInfoService.AgentOpInfo(octopusMessage); } else { // operation unknown log.error("Command Agent Operation Unknown! "); diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 4b25c5d..40b8c73 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -8,24 +8,31 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.server.config.ServerCommonPool; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; @Service @Slf4j -public class OctopusAgentServiceImpl implements OctopusAgentService{ +public class OctopusAgentServiceImpl implements OctopusAgentService { + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT; @Resource OMessageToAgentSender oMessageToAgentSender; - @Resource ObjectMapper objectMapper; @@ -34,24 +41,115 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{ HashMap result = new HashMap<>(); // 查询获取到最新的Agent版本信息 - result.put("LATEST_VERSION","2023-02-06-09-23-00"); + result.put( + "LATEST_VERSION", + "2023-02-06-09-23-00" + ); // 获取Agent的版本信息 -- 超时时间 // 发送OctopusMessage-Agent // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 - buildAndSendToAllAgent(AgentOperationType.VERSION); + LocalDateTime currentTime = TimeUtils.currentTime(); + buildAndSendToAllAgent( + AgentOperationType.VERSION, + currentTime + ); - waitCollectAllAgentVersionInfo(result); + waitCollectAllAgentVersionInfo( + result, + currentTime + ); + + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } return result; } - private void waitCollectAllAgentVersionInfo(HashMap result) { + private void waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime) { + + CompletableFuture getAllAgentVersionInfo = new CompletableFuture<>(); + try { + ExecutorService pool = ServerCommonPool.pool; + CountDownLatch countDownLatch = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size()); + + // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 + getAllAgentVersionInfo = CompletableFuture.runAsync( + () -> { + while (true) { + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + // 开始收集等待 200ms + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 获取到OM + // 判断信息是否是需要的类型 + // 根据 init_time + OctopusMessageType + AgentOperationMessage + if (!judgyIsCurrentServerReplyMessage( + message, + startTime + )) { + // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 + try { + OCTOPUS_MESSAGE_FROM_AGENT.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return; + } + + // 是当前需要的消息信息 + result.put( + message.getUuid(), + (String) message.getResult() + ); + } + }, + pool + ); + + // 超时等待5秒钟 + countDownLatch.await( + 5, + TimeUnit.SECONDS + ); + } catch (InterruptedException e) { + log.warn("存在部分Agent没有上报 版本信息!"); + getAllAgentVersionInfo.cancel(true); + } } - private void buildAndSendToAllAgent(AgentOperationType operationType) { + /** + * 判断信息是否是需要的类型 + * 根据 init_time + OctopusMessageType + AgentOperationMessage + * + * @return + */ + private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) { + + // init_time 时间判断 + boolean startTimeEqual = startTime.isEqual(message.getInit_time()); + + // OctopusMessageType判断 + boolean OMTypeEqual = message + .getType() + .equals(CurrentAppOctopusMessageType); + + return startTimeEqual && OMTypeEqual; + } + + private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) { List octopusMessageList = ALL_AGENT_TOPIC_NAME_LIST .stream() @@ -59,7 +157,8 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{ agentTopicName -> ConstructAgentOperationMessage( agentTopicName, - operationType + operationType, + currentTime ) ) .collect(Collectors.toList()); @@ -74,9 +173,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{ * * @param agentTopicName * @param operationType + * @param currentTime * @return */ - private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType) { + private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) { AgentOperationMessage operationMessage = AgentOperationMessage .builder() @@ -92,9 +192,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{ return OctopusMessage .builder() - .type(OctopusMessageType.AGENT) + .type(CurrentAppOctopusMessageType) .uuid(agentTopicName) - .init_time(TimeUtils.currentTime()) + .init_time(currentTime) .content(ops) .build(); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java similarity index 74% rename from server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java rename to server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java index 442b3da..0398cc3 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java @@ -12,12 +12,19 @@ import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; @Configuration @Slf4j(topic = "Octopus Message Handler") -public class OctopusMessageHandlerServer { +public class OctopusMessageHandler { + /** + * 存储所有的从 Agent过来的 OctopusMessage + * 各个业务模块需要自己手动去获取自己需要的内容 + */ + public static ArrayBlockingQueue OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>(128, true); + @Resource ObjectMapper objectMapper; @@ -39,6 +46,8 @@ public class OctopusMessageHandlerServer { // todo what to do after received the result + log.debug("cache the octopus message to inner cache list !"); + OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage); // collect all message from agent and log to somewhere