[Exec] modify async om handler structure

This commit is contained in:
zeaslity
2023-06-15 10:51:55 +08:00
parent ec3d5bba1e
commit 8574169150
5 changed files with 46 additions and 45 deletions

View File

@@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.AsyncWaitOMResult; import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService;
import io.wdd.rpc.message.handler.OMReplayContend; import io.wdd.rpc.message.handler.async.OMAsyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.config.ServerCommonPool; import io.wdd.server.config.ServerCommonPool;
@@ -28,8 +28,8 @@ import java.util.stream.Collectors;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.AGENT_LATEST_VERSION;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
@Service @Service
@Slf4j @Slf4j
@@ -45,7 +45,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource @Resource
AsyncWaitOMResult asyncWaitOMResult; AsyncWaitOMResultService asyncWaitOMResultService;
@Override @Override
public Map<String, String> getAllAgentVersion() { public Map<String, String> getAllAgentVersion() {
@@ -70,17 +70,17 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// 构造 异步结果监听内容 // 构造 异步结果监听内容
OMReplayContend omReplayContend = OMReplayContend.build( OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
currentTime currentTime
); );
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResult.waitFor(omReplayContend); asyncWaitOMResultService.waitFor(OMAsyncReplayContend);
//此处存在重大bug,会导致CPU占用飙升 //此处存在重大bug,会导致CPU占用飙升
/*CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( /*CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo(
@@ -106,10 +106,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
} }
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
asyncWaitOMResult.stopWaiting(omReplayContend); asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend);
// 处理结果 // 处理结果
omReplayContend OMAsyncReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.forEach( .forEach(
@@ -122,7 +122,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// help gc // help gc
omReplayContend = null; OMAsyncReplayContend = null;
} }
return result; return result;
@@ -156,16 +156,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// 构造结果 // 构造结果
OMReplayContend omReplayContend = OMReplayContend.build( OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
currentTime currentTime
); );
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResult.waitFor(omReplayContend); asyncWaitOMResultService.waitFor(OMAsyncReplayContend);
/* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( /* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
result, result,
@@ -185,10 +185,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 超时,或者 全部信息已经收集 // 超时,或者 全部信息已经收集
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
asyncWaitOMResult.stopWaiting(omReplayContend); asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend);
// 处理结果 // 处理结果
omReplayContend OMAsyncReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.forEach( .forEach(
@@ -216,7 +216,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// help gc // help gc
omReplayContend = null; OMAsyncReplayContend = null;
} }
return result; return result;

View File

@@ -2,8 +2,8 @@ package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.AsyncWaitOMResult; import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService;
import io.wdd.rpc.message.handler.OMReplayContend; import io.wdd.rpc.message.handler.async.OMAsyncReplayContend;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -24,7 +24,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource @Resource
AsyncWaitOMResult asyncWaitOMResult; AsyncWaitOMResultService asyncWaitOMResultService;
@Resource @Resource
SyncExecutionService asyncExecutionService; SyncExecutionService asyncExecutionService;
@@ -192,6 +192,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
LocalDateTime initTime = octopusMessage.getInit_time(); LocalDateTime initTime = octopusMessage.getInit_time();
// OM 中的result保存
ArrayList<String> result = new ArrayList<>(); ArrayList<String> result = new ArrayList<>();
// 构造消息等待对象 // 构造消息等待对象
@@ -203,16 +204,16 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
); );
} }
// 构造回复信息的内容
OMReplayContend omReplayContend = OMReplayContend.build( OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build(
commandCount, commandCount,
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
initTime initTime
); );
CountDownLatch countDownLatch = omReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch();
// 开始等待结果 // 开始等待结果
asyncWaitOMResult.waitFor(omReplayContend); asyncWaitOMResultService.waitFor(OMAsyncReplayContend);
// 监听结果 // 监听结果
try { try {
@@ -227,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
// 等待所有的结果返回 // 等待所有的结果返回
// 停止等待结果 // 停止等待结果
asyncWaitOMResult.stopWaiting(omReplayContend); asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend);
// 解析结果 // 解析结果
omReplayContend OMAsyncReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.map( .map(
@@ -249,7 +250,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
} }
// 返回 // 返回 执行的结果
return result; return result;
} }
} }

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler; package io.wdd.rpc.message.handler.async;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.server.config.ServerCommonPool; import io.wdd.server.config.ServerCommonPool;
@@ -10,7 +10,7 @@ import java.util.HashMap;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT;
/** /**
* 从Agent收集返回信息的统一处理地点 * 从Agent收集返回信息的统一处理地点
@@ -20,31 +20,31 @@ import static io.wdd.rpc.message.handler.OMessageHandlerServer.OCTOPUS_MESSAGE_F
*/ */
@Service @Service
@Slf4j @Slf4j
public class AsyncWaitOMResult { public class AsyncWaitOMResultService {
/** /**
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
* KEY -> replayMatchKey * KEY -> replayMatchKey
* VALUE -> OMReplayContend - 包含countDownLatch result * VALUE -> OMAsyncReplayContend - 包含countDownLatch result
*/ */
private static final HashMap<String, OMReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); private static final HashMap<String, OMAsyncReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
public void waitFor(OMReplayContend omReplayContend) { public void waitFor(OMAsyncReplayContend OMAsyncReplayContend) {
// REPLAY_CACHE_MAP中写入 Key // REPLAY_CACHE_MAP中写入 Key
OM_REPLAY_WAITING_TARGET_MAP.put( OM_REPLAY_WAITING_TARGET_MAP.put(
omReplayContend.getReplayMatchKey(), OMAsyncReplayContend.getReplayMatchKey(),
omReplayContend OMAsyncReplayContend
); );
// 在调用线程的countDownLunch结束之后,关闭 // 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列 // 清除 REPLAY_CACHE_MAP 中的队列
} }
public void stopWaiting(OMReplayContend omReplayContend) { public void stopWaiting(OMAsyncReplayContend OMAsyncReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
OM_REPLAY_WAITING_TARGET_MAP.remove(omReplayContend.getReplayMatchKey()); OM_REPLAY_WAITING_TARGET_MAP.remove(OMAsyncReplayContend.getReplayMatchKey());
} }
@@ -82,7 +82,7 @@ public class AsyncWaitOMResult {
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 构造 replayMatchKey // 构造 replayMatchKey
String matchKey = OMReplayContend.generateMatchKey( String matchKey = OMAsyncReplayContend.generateMatchKey(
replayOMessage.getType(), replayOMessage.getType(),
replayOMessage.getInit_time() replayOMessage.getInit_time()
); );
@@ -99,7 +99,7 @@ public class AsyncWaitOMResult {
} }
// Map中包含有Key,那么放置进去 // Map中包含有Key,那么放置进去
OMReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); OMAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
replayContend replayContend
.getReplayOMList() .getReplayOMList()
.add(replayOMessage); .add(replayOMessage);

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler; package io.wdd.rpc.message.handler.async;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
@@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch;
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的")
public class OMReplayContend { public class OMAsyncReplayContend {
@ApiModelProperty("rpc消息的类型") @ApiModelProperty("rpc消息的类型")
OctopusMessageType type; OctopusMessageType type;
@@ -37,7 +37,7 @@ public class OMReplayContend {
@ApiModelProperty("回复的结果列表, 临时保存") @ApiModelProperty("回复的结果列表, 临时保存")
ArrayList<OctopusMessage> replayOMList; ArrayList<OctopusMessage> replayOMList;
protected static String generateMatchKey(OMReplayContend replayIdentifier) { protected static String generateMatchKey(OMAsyncReplayContend replayIdentifier) {
String relayMatchKey = replayIdentifier String relayMatchKey = replayIdentifier
.getType() .getType()
@@ -65,9 +65,9 @@ public class OMReplayContend {
* *
* @return * @return
*/ */
public static OMReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { public static OMAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
return new OMReplayContend( return new OMAsyncReplayContend(
currentOMType, currentOMType,
currentTime, currentTime,
generateMatchKey( generateMatchKey(

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler; package io.wdd.rpc.message.handler.sync;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;