diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index beed801..271231a 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.AsyncWaitOMResult; -import io.wdd.rpc.message.handler.OMReplayContend; +import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; +import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; @@ -28,8 +28,8 @@ import java.util.stream.Collectors; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; -import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.AGENT_LATEST_VERSION; +import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; @Service @Slf4j @@ -45,7 +45,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { RedisTemplate redisTemplate; @Resource - AsyncWaitOMResult asyncWaitOMResult; + AsyncWaitOMResultService asyncWaitOMResultService; @Override public Map getAllAgentVersion() { @@ -70,17 +70,17 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - OMReplayContend omReplayContend = OMReplayContend.build( + OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOMResult.waitFor(omReplayContend); + asyncWaitOMResultService.waitFor(OMAsyncReplayContend); //此处存在重大bug,会导致CPU占用飙升 /*CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( @@ -106,10 +106,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { } // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOMResult.stopWaiting(omReplayContend); + asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); // 处理结果 - omReplayContend + OMAsyncReplayContend .getReplayOMList() .stream() .forEach( @@ -122,7 +122,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - omReplayContend = null; + OMAsyncReplayContend = null; } return result; @@ -156,16 +156,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造结果 - OMReplayContend omReplayContend = OMReplayContend.build( + OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOMResult.waitFor(omReplayContend); + asyncWaitOMResultService.waitFor(OMAsyncReplayContend); /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, @@ -185,10 +185,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOMResult.stopWaiting(omReplayContend); + asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); // 处理结果 - omReplayContend + OMAsyncReplayContend .getReplayOMList() .stream() .forEach( @@ -216,7 +216,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - omReplayContend = null; + OMAsyncReplayContend = null; } return result; diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index a3686f6..8a567c3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -2,8 +2,8 @@ package io.wdd.rpc.execute.service; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.AsyncWaitOMResult; -import io.wdd.rpc.message.handler.OMReplayContend; +import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; +import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -24,7 +24,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; @Resource - AsyncWaitOMResult asyncWaitOMResult; + AsyncWaitOMResultService asyncWaitOMResultService; @Resource SyncExecutionService asyncExecutionService; @@ -192,6 +192,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { LocalDateTime initTime = octopusMessage.getInit_time(); + // OM 中的result保存 ArrayList result = new ArrayList<>(); // 构造消息等待对象 @@ -203,16 +204,16 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { ); } - - OMReplayContend omReplayContend = OMReplayContend.build( + // 构造回复信息的内容 + OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( commandCount, CurrentAppOctopusMessageType, initTime ); - CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); // 开始等待结果 - asyncWaitOMResult.waitFor(omReplayContend); + asyncWaitOMResultService.waitFor(OMAsyncReplayContend); // 监听结果 try { @@ -227,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { // 等待所有的结果返回 // 停止等待结果 - asyncWaitOMResult.stopWaiting(omReplayContend); + asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); // 解析结果 - omReplayContend + OMAsyncReplayContend .getReplayOMList() .stream() .map( @@ -249,7 +250,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } - // 返回 + // 返回 执行的结果 return result; } } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java similarity index 77% rename from server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java index 9ac3d53..171b847 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/AsyncWaitOMResult.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOMResultService.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler; +package io.wdd.rpc.message.handler.async; import io.wdd.rpc.message.OctopusMessage; import io.wdd.server.config.ServerCommonPool; @@ -10,7 +10,7 @@ import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; /** * 从Agent收集返回信息的统一处理地点 @@ -20,31 +20,31 @@ import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_F */ @Service @Slf4j -public class AsyncWaitOMResult { +public class AsyncWaitOMResultService { /** * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 * KEY -> replayMatchKey - * VALUE -> OMReplayContend - 包含countDownLatch 和 result + * VALUE -> OMAsyncReplayContend - 包含countDownLatch 和 result */ - private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); + private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); - public void waitFor(OMReplayContend omReplayContend) { + public void waitFor(OMAsyncReplayContend OMAsyncReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key OM_REPLAY_WAITING_TARGET_MAP.put( - omReplayContend.getReplayMatchKey(), - omReplayContend + OMAsyncReplayContend.getReplayMatchKey(), + OMAsyncReplayContend ); // 在调用线程的countDownLunch结束之后,关闭 // 清除 REPLAY_CACHE_MAP 中的队列 } - public void stopWaiting(OMReplayContend omReplayContend) { + public void stopWaiting(OMAsyncReplayContend OMAsyncReplayContend) { // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - OM_REPLAY_WAITING_TARGET_MAP.remove(omReplayContend.getReplayMatchKey()); + OM_REPLAY_WAITING_TARGET_MAP.remove(OMAsyncReplayContend.getReplayMatchKey()); } @@ -82,7 +82,7 @@ public class AsyncWaitOMResult { OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 构造 replayMatchKey - String matchKey = OMReplayContend.generateMatchKey( + String matchKey = OMAsyncReplayContend.generateMatchKey( replayOMessage.getType(), replayOMessage.getInit_time() ); @@ -99,7 +99,7 @@ public class AsyncWaitOMResult { } // Map中包含有Key,那么放置进去 - OMReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); + OMAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java similarity index 86% rename from server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java index d7a8fe2..abf42e0 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/OMAsyncReplayContend.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler; +package io.wdd.rpc.message.handler.async; import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModel; @@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch; @NoArgsConstructor @SuperBuilder(toBuilder = true) @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") -public class OMReplayContend { +public class OMAsyncReplayContend { @ApiModelProperty("rpc消息的类型") OctopusMessageType type; @@ -37,7 +37,7 @@ public class OMReplayContend { @ApiModelProperty("回复的结果列表, 临时保存") ArrayList replayOMList; - protected static String generateMatchKey(OMReplayContend replayIdentifier) { + protected static String generateMatchKey(OMAsyncReplayContend replayIdentifier) { String relayMatchKey = replayIdentifier .getType() @@ -65,9 +65,9 @@ public class OMReplayContend { * * @return */ - public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + public static OMAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { - return new OMReplayContend( + return new OMAsyncReplayContend( currentOMType, currentTime, generateMatchKey( diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java similarity index 98% rename from server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java rename to server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java index 6d8053e..35314c1 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OMessageHandlerServer.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandlerServer.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.message.handler; +package io.wdd.rpc.message.handler.sync; import com.fasterxml.jackson.databind.ObjectMapper;