From 8231dd21e4dfe3d90d981a869a3c5805b7896fa8 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Wed, 8 Feb 2023 17:32:02 +0800 Subject: [PATCH] =?UTF-8?q?[=20server]=20[=20agent=20]=20-=20=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E4=BF=A1=E6=81=AF=E5=85=A8=E6=B5=81=E7=A8=8B-?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/wdd/common/utils/TimeUtils.java | 15 +++ .../rpc/agent/OctopusAgentServiceImpl.java | 121 +++++++++--------- .../handler/OctopusMessageHandler.java | 2 +- .../wdd/server/config/ServerCommonPool.java | 1 - 4 files changed, 77 insertions(+), 62 deletions(-) diff --git a/common/src/main/java/io/wdd/common/utils/TimeUtils.java b/common/src/main/java/io/wdd/common/utils/TimeUtils.java index 962948e..aadb0ef 100644 --- a/common/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/common/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -70,6 +70,21 @@ public class TimeUtils { return LocalDateTime.now(SYSTEM_TIME_ZONE_ID); } + /** + * @return 格式化 去掉时间中的毫秒数 + */ + public static LocalDateTime currentFormatTime() { + + DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + String format = LocalDateTime + .now(SYSTEM_TIME_ZONE_ID) + .format(ofPattern); + + return LocalDateTime.parse(format, ofPattern); + + } + public static LocalDateTime cvFromDate(Date date) { // fix bug diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index f9e6815..0b766fd 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; @@ -57,25 +56,34 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 发送OctopusMessage-Agent // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 - LocalDateTime currentTime = TimeUtils.currentTime(); + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + buildAndSendToAllAgent( AgentOperationType.VERSION, currentTime ); - waitCollectAllAgentVersionInfo( + CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); + + CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( result, - currentTime + currentTime, + countDownLatch ); try { - TimeUnit.SECONDS.sleep(5); + // 超时等待5秒钟, 或者所有的Agent均已经完成上报 + countDownLatch.await( + 5, + TimeUnit.SECONDS + ); } catch (InterruptedException e) { - throw new RuntimeException(e); + log.warn("存在部分Agent没有上报 版本信息!"); + } finally { + // 必须关闭释放线程 + getAllAgentVersionInfoFuture.cancel(true); } - System.out.println("result = " + result); - return result; } @@ -101,63 +109,56 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { return null; } - private void waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime) { + private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { - CompletableFuture getAllAgentVersionInfo = new CompletableFuture<>(); - try { - ExecutorService pool = ServerCommonPool.pool; - CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); + ExecutorService pool = ServerCommonPool.pool; - // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 - getAllAgentVersionInfo = CompletableFuture.runAsync( - () -> { - while (true) { - if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { - // 开始收集等待 200ms - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 + return CompletableFuture.runAsync( + () -> { + while (true) { + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + // 开始收集等待 200ms + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - - OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); - - // 获取到OM - // 判断信息是否是需要的类型 - // 根据 init_time + OctopusMessageType + AgentOperationMessage - if (!judgyIsCurrentServerReplyMessage( - message, - startTime - )) { - // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 - try { - OCTOPUS_MESSAGE_FROM_AGENT.put(message); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return; - } - - // 是当前需要的消息信息 - result.put( - message.getUuid(), - (String) message.getResult() - ); + // 返回,继续死循环 + continue; } - }, - pool - ); - // 超时等待5秒钟 - countDownLatch.await( - 5, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - log.warn("存在部分Agent没有上报 版本信息!"); - getAllAgentVersionInfo.cancel(true); - } + OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 获取到OM + // 判断信息是否是需要的类型 + // 根据 init_time + OctopusMessageType + AgentOperationMessage + if (!judgyIsCurrentServerReplyMessage( + message, + startTime + )) { + + // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 + try { + OCTOPUS_MESSAGE_FROM_AGENT.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + // 是当前需要的消息信息 + result.put( + message.getUuid(), + (String) message.getResult() + ); + countDownLatch.countDown(); + } + }, + pool + ); + } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java index cc5fb68..855a0b0 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java @@ -83,7 +83,7 @@ public class OctopusMessageHandler { // todo what to do after received the result log.debug("cache the octopus message to inner cache list !"); - OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage); + OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage); // collect all message from agent and log to somewhere diff --git a/server/src/main/java/io/wdd/server/config/ServerCommonPool.java b/server/src/main/java/io/wdd/server/config/ServerCommonPool.java index 2cb3902..86663fe 100644 --- a/server/src/main/java/io/wdd/server/config/ServerCommonPool.java +++ b/server/src/main/java/io/wdd/server/config/ServerCommonPool.java @@ -8,7 +8,6 @@ public class ServerCommonPool { public static ExecutorService pool; - static { ThreadFactory threadFactory = new ThreadFactoryBuilder()