From a2b6b01fd343abc01d54d50524ef959b4a256f07 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 11 Aug 2023 17:06:53 +0800 Subject: [PATCH] =?UTF-8?q?[=20Service=20]=20[=20Executor=20]=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84Executor=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/OctopusObjectMapperConfig.java | 34 + .../rpc/agent/OctopusAgentServiceImpl.java | 839 +++++++++--------- .../wdd/rpc/execute/ExecutionServiceImpl.java | 76 +- .../execute/service/SyncExecutionService.java | 47 - .../service/SyncExecutionServiceImpl.java | 135 --- .../handler/{sync => }/OMessageHandler.java | 38 +- ...ontend.java => OMessageReplayContent.java} | 32 +- .../{sync => }/OMessageToServerListener.java | 13 +- .../AsyncWaitOctopusMessageResultService.java | 126 --- .../status/service/SyncStatusServiceImpl.java | 530 +++++------ server/src/main/resources/application.yml | 3 +- 11 files changed, 812 insertions(+), 1061 deletions(-) delete mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java delete mode 100644 server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java rename server/src/main/java/io/wdd/rpc/message/handler/{sync => }/OMessageHandler.java (74%) rename server/src/main/java/io/wdd/rpc/message/handler/{async/OctopusMessageSyncReplayContend.java => OMessageReplayContent.java} (60%) rename server/src/main/java/io/wdd/rpc/message/handler/{sync => }/OMessageToServerListener.java (88%) delete mode 100644 server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java diff --git a/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java b/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java index b230c59..d92f092 100644 --- a/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java +++ b/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java @@ -1,6 +1,7 @@ package io.wdd.common.config; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -32,6 +33,39 @@ public class OctopusObjectMapperConfig { public static ObjectMapper OctopusObjectMapper = null; + public static String WriteToString(Object data) { + + try { + return OctopusObjectMapper.writeValueAsString(data); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static String WriteToStringPretty(Object data) { + try { + + return OctopusObjectMapper + .writerWithDefaultPrettyPrinter() + .writeValueAsString(data); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static byte[] WriteToBytes(Object data) { + + try { + + return OctopusObjectMapper.writeValueAsBytes(data); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + @PostConstruct public void setOctopusObjectMapper() { diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index de2de01..0134806 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -1,420 +1,419 @@ -package io.wdd.rpc.agent; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; -import io.wdd.rpc.message.sender.OMessageToAgentSender; -import io.wdd.server.beans.vo.ServerInfoVO; -import io.wdd.server.config.ServerCommonPool; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; - -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.LATEST_VERSION; -import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; -import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; -import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; - -//@Service -@Slf4j -public class OctopusAgentServiceImpl implements OctopusAgentService { - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT; - @Resource - OMessageToAgentSender oMessageToAgentSender; - @Resource - ObjectMapper objectMapper; - - @Resource - RedisTemplate redisTemplate; - - @Resource - AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; - - @Override - public Map getAllAgentVersion() { - HashMap result = new HashMap<>(); - - // 查询获取到最新的Agent版本信息 - result.put( - "LATEST_VERSION", - getRealAgentLatestVersion() - ); - - // 获取Agent的版本信息 -- 超时时间 - - // 从OctopusToServer中收集到所有Agent的版本信息 - // 组装信息至集合中 - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - - // 发送OctopusMessage-Agent - buildOMessageAndSendToAllHealthyAgent( - AgentOperationType.VERSION, - currentTime - ); - - // 构造 异步结果监听内容 - OctopusMessageSyncReplayContend agentReplayContend = OctopusMessageSyncReplayContend.build( - ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), - CurrentAppOctopusMessageType, - currentTime - ); - - CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch(); - - // 调用后台接收处理所有的Replay信息 - asyncWaitOctopusMessageResultService.waitFor(agentReplayContend); - - boolean isAllHealthyAgentReport = false; - try { - // 超时等待5秒钟, 或者所有的Agent均已经完成上报 - isAllHealthyAgentReport = countDownLatch.await( - 5, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - - } finally { - // 超时,或者 全部信息已经收集 - if (!isAllHealthyAgentReport) { - log.warn("存在部分Agent没有上报 版本信息!"); - } - - // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend); - - // 处理结果 - agentReplayContend - .getReplayOMList() - .stream() - .forEach( - mMessage -> { - result.put( - mMessage.getUuid(), - (String) mMessage.getResult() - ); - } - ); - - // help gc - agentReplayContend = null; - } - - return result; - } - - private String getRealAgentLatestVersion() { - - String latestVersion = (String) redisTemplate - .opsForValue() - .get( - LATEST_VERSION - ); - - return latestVersion; - } - - @Override - public Map getAllAgentCoreInfo() { - - HashMap result = new HashMap<>(); - - // 获取Agent的版本信息 -- 超时时间 - // 从OctopusToServer中收集到所有Agent的版本信息 - // 组装信息至集合中 - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - - // 发送OctopusMessage-Agent - buildOMessageAndSendToAllHealthyAgent( - AgentOperationType.INFO, - currentTime - ); - - // 构造结果 - OctopusMessageSyncReplayContend octopusMessageSyncReplayContend = OctopusMessageSyncReplayContend.build( - ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), - CurrentAppOctopusMessageType, - currentTime - ); - - CountDownLatch countDownLatch = octopusMessageSyncReplayContend.getCountDownLatch(); - - // 调用后台接收处理所有的Replay信息 - asyncWaitOctopusMessageResultService.waitFor(octopusMessageSyncReplayContend); - - /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( - result, - currentTime, - countDownLatch - ); -*/ - try { - // 超时等待5秒钟, 或者所有的Agent均已经完成上报ddo - countDownLatch.await( - 5, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - log.warn("存在部分Agent没有上报 核心信息!"); - } finally { - // 超时,或者 全部信息已经收集 - - // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSyncReplayContend); - - // 处理结果 - octopusMessageSyncReplayContend - .getReplayOMList() - .stream() - .forEach( - mMessage -> { - - try { - - // 解析当前信息 - ServerInfoVO serverInfoVO = objectMapper.readValue( - (String) mMessage.getResult(), - new TypeReference() { - } - ); - - result.put( - mMessage.getUuid(), - serverInfoVO - ); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - ); - - // help gc - octopusMessageSyncReplayContend = null; - } - - return result; - } - - - @Override - public String shutdownAgentDanger(String agentTopicName) { - - if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { - log.error("agentTopicName Error !"); - return "agentTopicName Error!"; - } - - LocalDateTime formatTime = TimeUtils.currentFormatTime(); - buildOMessageAndSendToAllHealthyAgent( - AgentOperationType.SHUTDOWN, - formatTime - ); - - // 是否需要检查相应的状态 - - - return null; - } - - private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { - - ExecutorService pool = ServerCommonPool.pool; - - // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 - return CompletableFuture.runAsync( - () -> { - while (true) { - if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { - // 开始收集等待 200ms - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // 返回,继续死循环 - continue; - } - - OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); - - // 获取到OM - // 判断信息是否是需要的类型 - // 根据 init_time + OctopusMessageType + AgentOperationMessage - if (!judgyIsCurrentServerReplyMessage( - message, - startTime - )) { - - // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 - - OCTOPUS_MESSAGE_FROM_AGENT.add(message); - - // 返回,继续死循环 - continue; - } - - // 是当前需要的消息信息 - result.put( - message.getUuid(), - (String) message.getResult() - ); - countDownLatch.countDown(); - } - }, - pool - ); - - - } - - private CompletableFuture waitCollectAllAgentCoreInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { - - ExecutorService pool = ServerCommonPool.pool; - - // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 - return CompletableFuture.runAsync( - () -> { - while (true) { - if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { - // 开始收集等待 200ms - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // 返回,继续死循环 - continue; - } - - OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); - - // 获取到OM - // 判断信息是否是需要的类型 - // 根据 init_time + OctopusMessageType + AgentOperationMessage - if (!judgyIsCurrentServerReplyMessage( - message, - startTime - )) { - - // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 - - OCTOPUS_MESSAGE_FROM_AGENT.offer(message); - - // 返回,继续死循环 - continue; - } - - // 是当前需要的消息信息 - try { - - // 解析当前信息 - ServerInfoVO serverInfoVO = objectMapper.readValue( - (String) message.getResult(), - new TypeReference() { - } - ); - - result.put( - message.getUuid(), - serverInfoVO - ); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - countDownLatch.countDown(); - } - }, - pool - ); - - } - - /** - * 判断信息是否是需要的类型 - * 根据 init_time + OctopusMessageType + AgentOperationMessage - * - * @return - */ - private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) { - - // init_time 时间判断 - boolean startTimeEqual = startTime.isEqual(message.getInit_time()); - - // OctopusMessageType判断 - boolean OMTypeEqual = message - .getOctopusMessageType() - .equals(CurrentAppOctopusMessageType); - - return startTimeEqual && OMTypeEqual; - } - - private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) { - - List octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST - .stream() - .map( - agentTopicName -> - ConstructAgentOperationMessage( - agentTopicName, - operationType, - currentTime - ) - ) - .collect(Collectors.toList()); - - // 发送相应的消息 - oMessageToAgentSender.send(octopusMessageList); - } - - /** - * 专门构造 Agent 类型的 OctopusMessage - * 通常只能在此类中使用 - * - * @param agentTopicName - * @param operationType - * @param currentTime - * @return - */ - private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) { - - AgentOperationMessage operationMessage = AgentOperationMessage - .builder() - .opType(operationType) - .build(); - - String ops; - try { - ops = objectMapper.writeValueAsString(operationMessage); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return OctopusMessage - .builder() - .octopusMessageType(CurrentAppOctopusMessageType) - .uuid(agentTopicName) - .init_time(currentTime) - .content(ops) - .build(); - - } -} +//package io.wdd.rpc.agent; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import com.fasterxml.jackson.core.type.TypeReference; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import io.wdd.common.utils.TimeUtils; +//import io.wdd.rpc.message.OctopusMessage; +//import io.wdd.rpc.message.OctopusMessageType; +//import io.wdd.rpc.message.handler.OMessageReplayContent; +//import io.wdd.rpc.message.sender.OMessageToAgentSender; +//import io.wdd.server.beans.vo.ServerInfoVO; +//import io.wdd.server.config.ServerCommonPool; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.data.redis.core.RedisTemplate; +// +//import javax.annotation.Resource; +//import java.time.LocalDateTime; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +//import java.util.concurrent.CompletableFuture; +//import java.util.concurrent.CountDownLatch; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.TimeUnit; +//import java.util.stream.Collectors; +// +//import static io.wdd.rpc.message.handler.OMessageToServerListener.LATEST_VERSION; +//import static io.wdd.rpc.message.handler.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; +//import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; +//import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +// +////@Service +//@Slf4j +//public class OctopusAgentServiceImpl implements OctopusAgentService { +// +// private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT; +// @Resource +// OMessageToAgentSender oMessageToAgentSender; +// @Resource +// ObjectMapper objectMapper; +// +// @Resource +// RedisTemplate redisTemplate; +// +// @Resource +// AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; +// +// @Override +// public Map getAllAgentVersion() { +// HashMap result = new HashMap<>(); +// +// // 查询获取到最新的Agent版本信息 +// result.put( +// "LATEST_VERSION", +// getRealAgentLatestVersion() +// ); +// +// // 获取Agent的版本信息 -- 超时时间 +// +// // 从OctopusToServer中收集到所有Agent的版本信息 +// // 组装信息至集合中 +// LocalDateTime currentTime = TimeUtils.currentFormatTime(); +// +// // 发送OctopusMessage-Agent +// buildOMessageAndSendToAllHealthyAgent( +// AgentOperationType.VERSION, +// currentTime +// ); +// +// // 构造 异步结果监听内容 +// OMessageReplayContent agentReplayContend = OMessageReplayContent.build( +// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), +// CurrentAppOctopusMessageType, +// currentTime +// ); +// +// CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch(); +// +// // 调用后台接收处理所有的Replay信息 +// asyncWaitOctopusMessageResultService.waitFor(agentReplayContend); +// +// boolean isAllHealthyAgentReport = false; +// try { +// // 超时等待5秒钟, 或者所有的Agent均已经完成上报 +// isAllHealthyAgentReport = countDownLatch.await( +// 5, +// TimeUnit.SECONDS +// ); +// } catch (InterruptedException e) { +// +// } finally { +// // 超时,或者 全部信息已经收集 +// if (!isAllHealthyAgentReport) { +// log.warn("存在部分Agent没有上报 版本信息!"); +// } +// +// // 此处调用,即可中断 异步任务的收集工作 +// asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend); +// +// // 处理结果 +// agentReplayContend +// .getReplayOMList() +// .stream() +// .forEach( +// mMessage -> { +// result.put( +// mMessage.getUuid(), +// (String) mMessage.getResult() +// ); +// } +// ); +// +// // help gc +// agentReplayContend = null; +// } +// +// return result; +// } +// +// private String getRealAgentLatestVersion() { +// +// String latestVersion = (String) redisTemplate +// .opsForValue() +// .get( +// LATEST_VERSION +// ); +// +// return latestVersion; +// } +// +// @Override +// public Map getAllAgentCoreInfo() { +// +// HashMap result = new HashMap<>(); +// +// // 获取Agent的版本信息 -- 超时时间 +// // 从OctopusToServer中收集到所有Agent的版本信息 +// // 组装信息至集合中 +// LocalDateTime currentTime = TimeUtils.currentFormatTime(); +// +// // 发送OctopusMessage-Agent +// buildOMessageAndSendToAllHealthyAgent( +// AgentOperationType.INFO, +// currentTime +// ); +// +// // 构造结果 +// OMessageReplayContent oMessageReplayContent = OMessageReplayContent.build( +// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), +// CurrentAppOctopusMessageType, +// currentTime +// ); +// +// CountDownLatch countDownLatch = oMessageReplayContent.getCountDownLatch(); +// +// // 调用后台接收处理所有的Replay信息 +// asyncWaitOctopusMessageResultService.waitFor(oMessageReplayContent); +// +// /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( +// result, +// currentTime, +// countDownLatch +// ); +//*/ +// try { +// // 超时等待5秒钟, 或者所有的Agent均已经完成上报ddo +// countDownLatch.await( +// 5, +// TimeUnit.SECONDS +// ); +// } catch (InterruptedException e) { +// log.warn("存在部分Agent没有上报 核心信息!"); +// } finally { +// // 超时,或者 全部信息已经收集 +// +// // 此处调用,即可中断 异步任务的收集工作 +// asyncWaitOctopusMessageResultService.stopWaiting(oMessageReplayContent); +// +// // 处理结果 +// oMessageReplayContent +// .getReplayOMList() +// .stream() +// .forEach( +// mMessage -> { +// +// try { +// +// // 解析当前信息 +// ServerInfoVO serverInfoVO = objectMapper.readValue( +// (String) mMessage.getResult(), +// new TypeReference() { +// } +// ); +// +// result.put( +// mMessage.getUuid(), +// serverInfoVO +// ); +// +// } catch (JsonProcessingException e) { +// throw new RuntimeException(e); +// } +// +// } +// ); +// +// // help gc +// oMessageReplayContent = null; +// } +// +// return result; +// } +// +// +// @Override +// public String shutdownAgentDanger(String agentTopicName) { +// +// if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { +// log.error("agentTopicName Error !"); +// return "agentTopicName Error!"; +// } +// +// LocalDateTime formatTime = TimeUtils.currentFormatTime(); +// buildOMessageAndSendToAllHealthyAgent( +// AgentOperationType.SHUTDOWN, +// formatTime +// ); +// +// // 是否需要检查相应的状态 +// +// +// return null; +// } +// +// private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { +// +// ExecutorService pool = ServerCommonPool.pool; +// +// // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 +// return CompletableFuture.runAsync( +// () -> { +// while (true) { +// if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { +// // 开始收集等待 200ms +// try { +// TimeUnit.MILLISECONDS.sleep(50); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// // 返回,继续死循环 +// continue; +// } +// +// OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); +// +// // 获取到OM +// // 判断信息是否是需要的类型 +// // 根据 init_time + OctopusMessageType + AgentOperationMessage +// if (!judgyIsCurrentServerReplyMessage( +// message, +// startTime +// )) { +// +// // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 +// +// OCTOPUS_MESSAGE_FROM_AGENT.add(message); +// +// // 返回,继续死循环 +// continue; +// } +// +// // 是当前需要的消息信息 +// result.put( +// message.getUuid(), +// (String) message.getResult() +// ); +// countDownLatch.countDown(); +// } +// }, +// pool +// ); +// +// +// } +// +// private CompletableFuture waitCollectAllAgentCoreInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { +// +// ExecutorService pool = ServerCommonPool.pool; +// +// // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 +// return CompletableFuture.runAsync( +// () -> { +// while (true) { +// if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { +// // 开始收集等待 200ms +// try { +// TimeUnit.MILLISECONDS.sleep(50); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// // 返回,继续死循环 +// continue; +// } +// +// OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); +// +// // 获取到OM +// // 判断信息是否是需要的类型 +// // 根据 init_time + OctopusMessageType + AgentOperationMessage +// if (!judgyIsCurrentServerReplyMessage( +// message, +// startTime +// )) { +// +// // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 +// +// OCTOPUS_MESSAGE_FROM_AGENT.offer(message); +// +// // 返回,继续死循环 +// continue; +// } +// +// // 是当前需要的消息信息 +// try { +// +// // 解析当前信息 +// ServerInfoVO serverInfoVO = objectMapper.readValue( +// (String) message.getResult(), +// new TypeReference() { +// } +// ); +// +// result.put( +// message.getUuid(), +// serverInfoVO +// ); +// +// } catch (JsonProcessingException e) { +// throw new RuntimeException(e); +// } +// +// countDownLatch.countDown(); +// } +// }, +// pool +// ); +// +// } +// +// /** +// * 判断信息是否是需要的类型 +// * 根据 init_time + OctopusMessageType + AgentOperationMessage +// * +// * @return +// */ +// private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) { +// +// // init_time 时间判断 +// boolean startTimeEqual = startTime.isEqual(message.getInit_time()); +// +// // OctopusMessageType判断 +// boolean OMTypeEqual = message +// .getOctopusMessageType() +// .equals(CurrentAppOctopusMessageType); +// +// return startTimeEqual && OMTypeEqual; +// } +// +// private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) { +// +// List octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST +// .stream() +// .map( +// agentTopicName -> +// ConstructAgentOperationMessage( +// agentTopicName, +// operationType, +// currentTime +// ) +// ) +// .collect(Collectors.toList()); +// +// // 发送相应的消息 +// oMessageToAgentSender.send(octopusMessageList); +// } +// +// /** +// * 专门构造 Agent 类型的 OctopusMessage +// * 通常只能在此类中使用 +// * +// * @param agentTopicName +// * @param operationType +// * @param currentTime +// * @return +// */ +// private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) { +// +// AgentOperationMessage operationMessage = AgentOperationMessage +// .builder() +// .opType(operationType) +// .build(); +// +// String ops; +// try { +// ops = objectMapper.writeValueAsString(operationMessage); +// } catch (JsonProcessingException e) { +// throw new RuntimeException(e); +// } +// +// return OctopusMessage +// .builder() +// .octopusMessageType(CurrentAppOctopusMessageType) +// .uuid(agentTopicName) +// .init_time(currentTime) +// .content(ops) +// .build(); +// +// } +//} diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java index 4e7bbf5..fc17896 100644 --- a/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java @@ -3,6 +3,7 @@ package io.wdd.rpc.execute; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; +import io.wdd.rpc.message.handler.OMessageReplayContent; import io.wdd.rpc.message.sender.OMessageToAgentSender; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -11,9 +12,10 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import static io.wdd.rpc.message.handler.sync.OMessageHandler.StopWaitingResult; -import static io.wdd.rpc.message.handler.sync.OMessageHandler.WaitFromAgent; +import static io.wdd.rpc.message.handler.OMessageHandler.*; import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; @Service @@ -53,41 +55,53 @@ public class ExecutionServiceImpl implements ExecutionService { // send the message oMessageToAgentSender.send(octopusMessage); - System.out.println("originOctopusMessage = " + octopusMessage.hashCode()); + log.debug( + "发送的 matchKey为 {}, 内容为 {}", + GenerateOMessageMatchKey( + octopusMessage.getOctopusMessageType(), + octopusMessage.getInit_time() + ), + octopusMessage + ); // 需要返回结果 if (!durationTask) { // 等待结果 - WaitFromAgent(octopusMessage); - - synchronized (octopusMessage) { - - try { - octopusMessage.wait(10000); - - - log.debug("等待结束!"); - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } - // 转换结果 - commandResultLog = (ArrayList) octopusMessage.getResult(); - - // debug - log.debug( - "执行命令 {} 的结果为 {} 内容为 {}", - executionMessage.getSingleLineCommand() == null ? executionMessage.getMultiLineCommand() : executionMessage.getSingleLineCommand(), - octopusMessage.getResultCode(), - octopusMessage.getResult() + OMessageReplayContent replayContent = WaitFromAgent( + octopusMessage, + 1 ); - } + CountDownLatch replayLatch = replayContent.getCountDownLatch(); + boolean waitOK = false; - // 释放等待队列 - StopWaitingResult(octopusMessage); + try { + waitOK = replayLatch.await( + 10, + TimeUnit.SECONDS + ); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + + // 释放等待队列 + StopWaitingResult(octopusMessage); + + // 转换结果 + commandResultLog = (ArrayList) octopusMessage.getResult(); + + // debug + log.debug( + "执行命令 {} {} 在规定时间内结束, 结果为 {} 返回内容为 {}", + executionMessage.getSingleLineCommand() == null ? executionMessage.getMultiLineCommand() : executionMessage.getSingleLineCommand(), + waitOK ? "已经" : "未", + octopusMessage.getResultCode(), + octopusMessage.getResult() + ); + } + + } return commandResultLog; } @@ -98,7 +112,7 @@ public class ExecutionServiceImpl implements ExecutionService { return OctopusMessage .builder() .octopusMessageType(OctopusMessageType.EXECUTOR) - .init_time(TimeUtils.currentTime()) + .init_time(TimeUtils.currentFormatTime()) .uuid(agentTopicName) .content( executionMessage diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java deleted file mode 100644 index 526015b..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionService.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.wdd.rpc.execute.service; - -import java.util.ArrayList; -import java.util.List; - -/** - * 同步命令执行的核心类 - * 需要等待命令执行完毕,完后返回相应的结果 - */ -public interface SyncExecutionService { - - List> SyncSendCommandToAgentComplete( - List agentTopicNameList, - String type, - List funcContent, - List commandList, - List> commandListComplete, - boolean needResultReplay, - String futureKey, - boolean durationTask - ); - - - /** - * 调用 完整脚本的 最底层函数 - * - * @param agentTopicName - * @param type - * @param funcContent - * @param commandList - * @param commandListComplete - * @param futureKey - * @param durationTask - * @return resultKey 本次操作在Redis中记录的结果Key - */ - ArrayList SyncSendCommandToAgentComplete( - String agentTopicName, - String type, - List funcContent, - List commandList, - List> commandListComplete, - boolean needResultReplay, - String futureKey, - boolean durationTask - ); - -} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java deleted file mode 100644 index 2ebaf43..0000000 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ /dev/null @@ -1,135 +0,0 @@ -package io.wdd.rpc.execute.service; - -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; -import lombok.extern.slf4j.Slf4j; - -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -//@Service -@Slf4j -public class SyncExecutionServiceImpl implements SyncExecutionService { - - private static final boolean COMMAND_EXEC_NEED_REPLAY = true; - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; - @Resource - AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; - @Resource - AsyncExecutionService asyncExecutionService; - - /** - * 一个命令执行的最长等待时间 - */ - int processMaxWaitSeconds = 10; - - @Override - public List> SyncSendCommandToAgentComplete(List agentTopicNameList, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - - return agentTopicNameList - .stream() - .map( - agentTopicName -> { - return this.SyncSendCommandToAgentComplete( - agentTopicName, - type, - null, - commandList, - commandListComplete, - needResultReplay, - futureKey, - durationTask - ); - } - ) - .collect(Collectors.toList()); - - } - - @Override - public ArrayList SyncSendCommandToAgentComplete(String agentTopicName, String type, List funcContent, List commandList, List> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) { - - // 异步访问 - OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent( - agentTopicName, - type, - null, - commandList, - commandListComplete, - needResultReplay, - futureKey, - durationTask - ); - - LocalDateTime initTime = octopusMessage.getInit_time(); - - // OM 中的result保存 - ArrayList result = new ArrayList<>(); - - // 构造消息等待对象 - int commandCount = 1; - if (null != commandListComplete) { - commandCount = Math.max( - commandListComplete.size(), - 1 - ); - } - - // 构造回复信息的内容 - OctopusMessageSyncReplayContend executionReplayContent = OctopusMessageSyncReplayContend.build( - commandCount, - CurrentAppOctopusMessageType, - initTime - ); - CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch(); - - // 开始等待结果 - asyncWaitOctopusMessageResultService.waitFor(executionReplayContent); - - // 监听结果 - try { - boolean await = countDownLatch.await( - processMaxWaitSeconds, - TimeUnit.SECONDS - ); - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - - // 等待所有的结果返回 - // 停止等待结果 - asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent); - - // 解析结果 - executionReplayContent - .getReplayOMList() - .stream() - .map( - om -> { - log.debug( - "replay message is => {}", - om - ); - - return (ArrayList) om.getResult(); - } - ) - .forEachOrdered( - singleResult -> result.addAll(singleResult) - ); - - } - - // 返回 执行的结果 - return result; - } -} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandler.java similarity index 74% rename from server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java rename to server/src/main/java/io/wdd/rpc/message/handler/OMessageHandler.java index 121cade..b73c99f 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandler.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler.sync; +package io.wdd.rpc.message.handler; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; @@ -11,8 +11,8 @@ import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP; -import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP; +import static io.wdd.rpc.message.handler.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; @Service @Slf4j(topic = "Octopus Message Handler") @@ -44,18 +44,24 @@ public class OMessageHandler { } - public static void WaitFromAgent(OctopusMessage octopusMessage) { + public static OMessageReplayContent WaitFromAgent(OctopusMessage originOMessage, int waitFroReplayNum) { // 构建 MatchKey String matchKey = GenerateOMessageMatchKey( - octopusMessage.getOctopusMessageType(), - octopusMessage.getInit_time() + originOMessage.getOctopusMessageType(), + originOMessage.getInit_time() + ); + + // 构造等待对象 + OMessageReplayContent replayContent = OMessageReplayContent.build( + originOMessage, + waitFroReplayNum ); // 开始等待 FROM_AGENT_MATCH_TO_AGENT_MAP.put( matchKey, - octopusMessage + replayContent ); //debug @@ -63,6 +69,9 @@ public class OMessageHandler { "wait from agent map is => {}", FROM_AGENT_MATCH_TO_AGENT_MAP ); + + + return replayContent; } public static void StopWaitingResult(OctopusMessage octopusMessage) { @@ -108,10 +117,16 @@ public class OMessageHandler { replayOMessage.getInit_time() ); + log.info( + "接收到的 matchKey为 {}, 内容为 {}", + matchKey, + replayOMessage + ); + if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) { // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // todo 错误的数据需要放置于某处 - log.debug( + log.warn( "等待队列里面没有该回复的结果key =>", matchKey ); @@ -121,13 +136,16 @@ public class OMessageHandler { // 归还信息 // 拿到原始信息 - OctopusMessage originOMessage = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey); + OMessageReplayContent oMessageReplayContent = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey); + OctopusMessage originOMessage = oMessageReplayContent.getOriginOMessage(); originOMessage.setResultCode(replayOMessage.getResultCode()); originOMessage.setResult(replayOMessage.getResult()); // 通知等待线程 - originOMessage.notify(); + oMessageReplayContent + .getCountDownLatch() + .countDown(); } } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageReplayContent.java similarity index 60% rename from server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java rename to server/src/main/java/io/wdd/rpc/message/handler/OMessageReplayContent.java index f2ad53f..125ef62 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSyncReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageReplayContent.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler.async; +package io.wdd.rpc.message.handler; import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModel; @@ -11,20 +11,19 @@ import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.concurrent.CountDownLatch; -import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey; +import static io.wdd.rpc.message.handler.OMessageHandler.GenerateOMessageMatchKey; @Data @AllArgsConstructor @NoArgsConstructor @SuperBuilder(toBuilder = true) @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") -public class OctopusMessageSyncReplayContend { +public class OMessageReplayContent { @ApiModelProperty("rpc消息的类型") - OctopusMessageType type; + OctopusMessageType octopusMessageType; @ApiModelProperty("rpc消息发送的时间, 精确匹配,去掉毫秒") @JsonFormat(pattern = "yyyy-MM-dd hh:MM:ss") @@ -36,31 +35,26 @@ public class OctopusMessageSyncReplayContend { @ApiModelProperty("需要等待的消息个数") CountDownLatch countDownLatch; - @ApiModelProperty("回复的结果列表, 临时保存") - ArrayList replayOMList; + @ApiModelProperty("发送的OctopusMessage") + OctopusMessage originOMessage; - /** - * Execution模块使用的模板 - * - * @return - */ - public static OctopusMessageSyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + public static OMessageReplayContent build(OctopusMessage originOMessage, int waitForReplayNum) { CountDownLatch latch = null; if (waitForReplayNum != 0) { latch = new CountDownLatch(waitForReplayNum); } - return new OctopusMessageSyncReplayContend( - currentOMType, - currentTime, + return new OMessageReplayContent( + originOMessage.getOctopusMessageType(), + originOMessage.getInit_time(), GenerateOMessageMatchKey( - currentOMType, - currentTime + originOMessage.getOctopusMessageType(), + originOMessage.getInit_time() ), latch, - new ArrayList(16) + originOMessage ); } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java b/server/src/main/java/io/wdd/rpc/message/handler/OMessageToServerListener.java similarity index 88% rename from server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java rename to server/src/main/java/io/wdd/rpc/message/handler/OMessageToServerListener.java index c4fbb7a..fdaa429 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OMessageToServerListener.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler.sync; +package io.wdd.rpc.message.handler; import io.wdd.common.handler.MyRuntimeException; @@ -16,6 +16,7 @@ import java.util.ArrayDeque; import java.util.HashMap; import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.WriteToString; @Configuration @Slf4j(topic = "Octopus Message Listener") @@ -39,10 +40,11 @@ public class OMessageToServerListener { /** * 发送出去的OctopusMessage需要和返回回来的内容对比 * 返回来的OM反序列化之后就不是原对象,需要进行 通过MatchKey比较 + * 2023年8月11日 修改为OMessageReplayContent *

- * omMatchKey -- OctopusMessage + * omMatchKey -- OMessageReplayContent */ - public static HashMap FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>(); + public static HashMap FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>(); @Resource RedisTemplate redisTemplate; @@ -67,12 +69,11 @@ public class OMessageToServerListener { } // Octopus Message Handler - log.debug( + log.info( "received from agent : {} ", - octopusMessage + WriteToString(octopusMessage) ); - System.out.println("receivedOctopusMessage = " + octopusMessage.hashCode()); // 获取Agent的版本信息 if (octopusMessage diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java deleted file mode 100644 index d516908..0000000 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java +++ /dev/null @@ -1,126 +0,0 @@ -package io.wdd.rpc.message.handler.async; - -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.server.config.ServerCommonPool; -import lombok.extern.slf4j.Slf4j; - -import javax.annotation.PostConstruct; -import java.util.HashMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey; -import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; - -/** - * 从Agent收集返回信息的统一处理地点 - * 使用方法: 业务类构造 OMReplayContend对象,调用AsyncWaitOMResult.waitFor()方法 - *

- * 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容 - */ -//@Service -@Slf4j -public class AsyncWaitOctopusMessageResultService { - - - @PostConstruct - public void daemonHandleReplayOMFromAgent() { - - // 异步任务启动 - CompletableFuture.runAsync( - () -> doHandleReplayOMFromAgent(), - ServerCommonPool.pool - ); - - } - - /** - * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 - * KEY -> replayMatchKey - * VALUE -> OctopusMessageSyncReplayContend - 包含countDownLatch 和 result - */ - private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); - - /** - * 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中 - */ - private void doHandleReplayOMFromAgent() { - - // 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT - while (true) { - - if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { - // 开始收集等待 50 ms - try { - TimeUnit.MILLISECONDS.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // 返回,继续死循环 - continue; - } - - // 拿到消息 - OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); - - // 构造 replayMatchKey - String matchKey = GenerateOMessageMatchKey( - replayOMessage.getOctopusMessageType(), - replayOMessage.getInit_time() - ); - if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) { - // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 - - // todo 错误的数据需要放置于某处 - log.debug( - "等待队列里面没有该回复的结果key =>", - matchKey - ); - - continue; - } - - // Map中包含有Key,那么放置进去 - OctopusMessageSyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); - - replayContend - .getReplayOMList() - .add(replayOMessage); - - // 需要操作countDown - replayContend - .getCountDownLatch() - .countDown(); - - // 结束操作,继续循环 - } - - } - - public void waitFor(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) { - - // 向 REPLAY_CACHE_MAP中写入 Key - OM_REPLAY_WAITING_TARGET_MAP.put( - OctopusMessageSyncReplayContend.getReplayMatchKey(), - OctopusMessageSyncReplayContend - ); - - // 在调用线程的countDownLunch结束之后,关闭 - // 清除 REPLAY_CACHE_MAP 中的队列 - } - - public void stopWaiting(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) { - - // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - OctopusMessageSyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSyncReplayContend.getReplayMatchKey()); - - // 移除该内容 - OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSyncReplayContend.getReplayMatchKey()); - - // help gc - contend = null; - - } - - -} diff --git a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java index ab48e2d..3bc2482 100644 --- a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java @@ -1,265 +1,265 @@ -package io.wdd.rpc.status.service; - -import com.fasterxml.jackson.core.JsonProcessingException; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.beans.request.MetricQueryEntity; -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend; -import io.wdd.rpc.message.sender.OMessageToAgentSender; -import io.wdd.rpc.status.beans.AgentStatus; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.RedisTemplate; - -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; -import static io.wdd.rpc.status.OctopusStatusMessage.*; - -@Slf4j -//@Service -public class SyncStatusServiceImpl implements SyncStatusService { - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; - - @Resource - OMessageToAgentSender oMessageToAgentSender; - - @Resource - AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; - - @Resource - RedisTemplate redisTemplate; - - @Override - public Map SyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime) { - - // 构造最后的结果Map - Map agentAliveStatusMap = agentTopicNameList - .stream() - .collect( - Collectors.toMap( - agentTopicName -> agentTopicName, - agentTopicName -> Boolean.FALSE - )); - - // 当前的时间 - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - - // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息 - buildAndSendAgentStatusOctopusMessage( - agentTopicNameList, - HEALTHY_STATUS_MESSAGE_TYPE, - currentTime - ); - - // 同步收集消息 - OctopusMessageSyncReplayContend statusSyncReplayContend = OctopusMessageSyncReplayContend.build( - agentTopicNameList.size(), - CurrentAppOctopusMessageType, - currentTime - ); - asyncWaitOctopusMessageResultService.waitFor(statusSyncReplayContend); - - // 解析结果 - CountDownLatch countDownLatch = statusSyncReplayContend.getCountDownLatch(); - - // 等待状态返回的结果 - boolean agentAliveStatusCollectResult = false; - try { - agentAliveStatusCollectResult = countDownLatch.await( - aliveStatusWaitMaxTime, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - if (!agentAliveStatusCollectResult) { - log.debug("Agent存活状态检查,没有检查到全部的Agent!"); - } - - // 移除等待队列 - asyncWaitOctopusMessageResultService.stopWaiting(statusSyncReplayContend); - - // 处理结果 - statusSyncReplayContend - .getReplayOMList() - .stream() - .forEach( - statusOMessage -> { - if (statusOMessage.getResult() != null) { - agentAliveStatusMap.put( - statusOMessage.getUuid(), - Boolean.TRUE - ); - } - } - ); - } - - // 返回Agent的存活状态内容 - return agentAliveStatusMap; - } - - @Override - public Map SyncCollectAgentMetricStatus(List agentTopicNameList, int collectMetricWaitMaxTime) { - - // 状态的结果Map - HashMap metricMap = new HashMap<>(); - - // 当前的时间 - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - - // 构造所有的Metric的OM并且下发 - buildAndSendAgentStatusOctopusMessage( - agentTopicNameList, - METRIC_STATUS_MESSAGE_TYPE, - currentTime - ); - - // 同步等待结果, 并且解析结果 - OctopusMessageSyncReplayContend metricSyncReplayContend = OctopusMessageSyncReplayContend.build( - agentTopicNameList.size(), - CurrentAppOctopusMessageType, - currentTime - ); - asyncWaitOctopusMessageResultService.waitFor(metricSyncReplayContend); - - // 解析结果 - CountDownLatch countDownLatch = metricSyncReplayContend.getCountDownLatch(); - - // 等待状态返回的结果 - boolean agentAliveStatusCollectResult = false; - try { - agentAliveStatusCollectResult = countDownLatch.await( - collectMetricWaitMaxTime, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - log.error("[Agent Metric] - 收集Agent的运行状态失败!"); - throw new RuntimeException(e); - } finally { - if (!agentAliveStatusCollectResult) { - log.debug("Agent存活状态检查,没有检查到全部的Agent!"); - } - - // 移除等待队列 - asyncWaitOctopusMessageResultService.stopWaiting(metricSyncReplayContend); - - // 处理结果 - metricSyncReplayContend - .getReplayOMList() - .stream() - .forEach( - statusOMessage -> { - if (statusOMessage.getResult() != null) { - - // 解析Result对象为 AgentStatus - try { - - AgentStatus agentStatus = OctopusObjectMapper.readValue( - (String) statusOMessage.getResult(), - AgentStatus.class - ); - - // 保存结果 - metricMap.put( - statusOMessage.getUuid(), - agentStatus - ); - - } catch (JsonProcessingException e) { - log.error("[Agent Metric] - 解析AgentStatus失败!"); - throw new RuntimeException(e); - } - - } - } - ); - } - - return metricMap; - } - - @Override - public ArrayList QueryMetricStatus(MetricQueryEntity metricQueryEntity) { - - String queryRedisKey = metricQueryEntity.getAgentTopicName() + "-Metric"; - - if (!redisTemplate.hasKey(queryRedisKey)) { - log.error( - "[Metric] - 查询到没有此Agent {} 的Metric信息,直接返回!", - metricQueryEntity.getAgentTopicName() - ); - } - - double start = metricQueryEntity.getMetricStartTimeStamp(); - double end = metricQueryEntity.getMetricEndTimeStamp(); - - ArrayList statusArrayList = new ArrayList<>(); - - redisTemplate - .opsForZSet() - .rangeByScore( - queryRedisKey, - start, - end - ) - .stream() - .forEachOrdered( - metricString -> { - try { - - AgentStatus agentStatus = OctopusObjectMapper.readValue( - (String) metricString, - AgentStatus.class - ); - - statusArrayList.add(agentStatus); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - ); - - - return statusArrayList; - } - - - /** - * 2023年7月10日 通用的底层构造方法 Status类型的Octopus Message - * - * @param agentTopicNameList - * @param statusType - * @param currentTime - */ - private void buildAndSendAgentStatusOctopusMessage(List agentTopicNameList, String statusType, LocalDateTime currentTime) { - - List octopusStatusMessageList = agentTopicNameList - .stream() - .map( - agentTopicName -> ConstructAgentStatusMessage( - statusType, - agentTopicName, - currentTime - ) - ) - .collect(Collectors.toList()); - - // 发送信息 - oMessageToAgentSender.send(octopusStatusMessageList); - } - - -} +//package io.wdd.rpc.status.service; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import io.wdd.common.utils.TimeUtils; +//import io.wdd.rpc.beans.request.MetricQueryEntity; +//import io.wdd.rpc.message.OctopusMessage; +//import io.wdd.rpc.message.OctopusMessageType; +//import io.wdd.rpc.message.handler.OMessageReplayContent; +//import io.wdd.rpc.message.sender.OMessageToAgentSender; +//import io.wdd.rpc.status.beans.AgentStatus; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.data.redis.core.RedisTemplate; +// +//import javax.annotation.Resource; +//import java.time.LocalDateTime; +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +//import java.util.concurrent.CountDownLatch; +//import java.util.concurrent.TimeUnit; +//import java.util.stream.Collectors; +// +//import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; +//import static io.wdd.rpc.message.handler.OMessageHandler.WaitFromAgent; +//import static io.wdd.rpc.status.OctopusStatusMessage.*; +// +//@Slf4j +////@Service +//public class SyncStatusServiceImpl implements SyncStatusService { +// +// private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; +// +// @Resource +// OMessageToAgentSender oMessageToAgentSender; +// +// @Resource +// RedisTemplate redisTemplate; +// +// @Override +// public Map SyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime) { +// +// // 构造最后的结果Map +// Map agentAliveStatusMap = agentTopicNameList +// .stream() +// .collect( +// Collectors.toMap( +// agentTopicName -> agentTopicName, +// agentTopicName -> Boolean.FALSE +// )); +// +// // 当前的时间 +// LocalDateTime currentTime = TimeUtils.currentFormatTime(); +// +// // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息 +// buildAndSendAgentStatusOctopusMessage( +// agentTopicNameList, +// HEALTHY_STATUS_MESSAGE_TYPE, +// currentTime +// ); +// +// // 同步收集消息 +// OMessageReplayContent statusSyncReplayContend = OMessageReplayContent.build( +// agentTopicNameList.size(), +// CurrentAppOctopusMessageType, +// currentTime +// ); +// WaitFromAgent(); +// +// // 解析结果 +// CountDownLatch countDownLatch = statusSyncReplayContend.getCountDownLatch(); +// +// // 等待状态返回的结果 +// boolean agentAliveStatusCollectResult = false; +// try { +// agentAliveStatusCollectResult = countDownLatch.await( +// aliveStatusWaitMaxTime, +// TimeUnit.SECONDS +// ); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } finally { +// if (!agentAliveStatusCollectResult) { +// log.debug("Agent存活状态检查,没有检查到全部的Agent!"); +// } +// +// // 移除等待队列 +// asyncWaitOctopusMessageResultService.stopWaiting(statusSyncReplayContend); +// +// // 处理结果 +// statusSyncReplayContend +// .getReplayOMList() +// .stream() +// .forEach( +// statusOMessage -> { +// if (statusOMessage.getResult() != null) { +// agentAliveStatusMap.put( +// statusOMessage.getUuid(), +// Boolean.TRUE +// ); +// } +// } +// ); +// } +// +// // 返回Agent的存活状态内容 +// return agentAliveStatusMap; +// } +// +// @Override +// public Map SyncCollectAgentMetricStatus(List agentTopicNameList, int collectMetricWaitMaxTime) { +// +// // 状态的结果Map +// HashMap metricMap = new HashMap<>(); +// +// // 当前的时间 +// LocalDateTime currentTime = TimeUtils.currentFormatTime(); +// +// // 构造所有的Metric的OM并且下发 +// buildAndSendAgentStatusOctopusMessage( +// agentTopicNameList, +// METRIC_STATUS_MESSAGE_TYPE, +// currentTime +// ); +// +// // 同步等待结果, 并且解析结果 +// OMessageReplayContent metricSyncReplayContend = OMessageReplayContent.build( +// agentTopicNameList.size(), +// CurrentAppOctopusMessageType, +// currentTime +// ); +// asyncWaitOctopusMessageResultService.waitFor(metricSyncReplayContend); +// +// // 解析结果 +// CountDownLatch countDownLatch = metricSyncReplayContend.getCountDownLatch(); +// +// // 等待状态返回的结果 +// boolean agentAliveStatusCollectResult = false; +// try { +// agentAliveStatusCollectResult = countDownLatch.await( +// collectMetricWaitMaxTime, +// TimeUnit.SECONDS +// ); +// } catch (InterruptedException e) { +// log.error("[Agent Metric] - 收集Agent的运行状态失败!"); +// throw new RuntimeException(e); +// } finally { +// if (!agentAliveStatusCollectResult) { +// log.debug("Agent存活状态检查,没有检查到全部的Agent!"); +// } +// +// // 移除等待队列 +// asyncWaitOctopusMessageResultService.stopWaiting(metricSyncReplayContend); +// +// // 处理结果 +// metricSyncReplayContend +// .getReplayOMList() +// .stream() +// .forEach( +// statusOMessage -> { +// if (statusOMessage.getResult() != null) { +// +// // 解析Result对象为 AgentStatus +// try { +// +// AgentStatus agentStatus = OctopusObjectMapper.readValue( +// (String) statusOMessage.getResult(), +// AgentStatus.class +// ); +// +// // 保存结果 +// metricMap.put( +// statusOMessage.getUuid(), +// agentStatus +// ); +// +// } catch (JsonProcessingException e) { +// log.error("[Agent Metric] - 解析AgentStatus失败!"); +// throw new RuntimeException(e); +// } +// +// } +// } +// ); +// } +// +// return metricMap; +// } +// +// @Override +// public ArrayList QueryMetricStatus(MetricQueryEntity metricQueryEntity) { +// +// String queryRedisKey = metricQueryEntity.getAgentTopicName() + "-Metric"; +// +// if (!redisTemplate.hasKey(queryRedisKey)) { +// log.error( +// "[Metric] - 查询到没有此Agent {} 的Metric信息,直接返回!", +// metricQueryEntity.getAgentTopicName() +// ); +// } +// +// double start = metricQueryEntity.getMetricStartTimeStamp(); +// double end = metricQueryEntity.getMetricEndTimeStamp(); +// +// ArrayList statusArrayList = new ArrayList<>(); +// +// redisTemplate +// .opsForZSet() +// .rangeByScore( +// queryRedisKey, +// start, +// end +// ) +// .stream() +// .forEachOrdered( +// metricString -> { +// try { +// +// AgentStatus agentStatus = OctopusObjectMapper.readValue( +// (String) metricString, +// AgentStatus.class +// ); +// +// statusArrayList.add(agentStatus); +// +// } catch (JsonProcessingException e) { +// throw new RuntimeException(e); +// } +// } +// ); +// +// +// return statusArrayList; +// } +// +// +// /** +// * 2023年7月10日 通用的底层构造方法 Status类型的Octopus Message +// * +// * @param agentTopicNameList +// * @param statusType +// * @param currentTime +// * @return +// */ +// private List buildAndSendAgentStatusOctopusMessage(List agentTopicNameList, String statusType, LocalDateTime currentTime) { +// +// List octopusStatusMessageList = agentTopicNameList +// .stream() +// .map( +// agentTopicName -> ConstructAgentStatusMessage( +// statusType, +// agentTopicName, +// currentTime +// ) +// ) +// .collect(Collectors.toList()); +// +// // 发送信息 +// oMessageToAgentSender.send(octopusStatusMessageList); +// +// return octopusStatusMessageList; +// } +// +// +//} diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index ad8ab21..fbfa6d0 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -164,6 +164,5 @@ oss: # 开启debug模式 logging: level: - io.wdd.rpc.execute: debug -debug: true + io.wdd.rpc: debug