[ Status ] add the async way to get agent status

This commit is contained in:
zeaslity
2023-06-15 16:29:26 +08:00
parent 8574169150
commit 4b3f7be1dd
13 changed files with 292 additions and 155 deletions

View File

@@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.config.ServerCommonPool; import io.wdd.server.config.ServerCommonPool;
@@ -45,7 +45,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource @Resource
AsyncWaitOMResultService asyncWaitOMResultService; AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Override @Override
public Map<String, String> getAllAgentVersion() { public Map<String, String> getAllAgentVersion() {
@@ -70,17 +70,17 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// 构造 异步结果监听内容 // 构造 异步结果监听内容
OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( OctopusMessageAsyncReplayContend agentReplayContend = OctopusMessageAsyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
currentTime currentTime
); );
CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResultService.waitFor(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.waitFor(agentReplayContend);
//此处存在重大bug,会导致CPU占用飙升 //此处存在重大bug,会导致CPU占用飙升
/*CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( /*CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo(
@@ -106,10 +106,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
} }
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend);
// 处理结果 // 处理结果
OMAsyncReplayContend agentReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.forEach( .forEach(
@@ -122,7 +122,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// help gc // help gc
OMAsyncReplayContend = null; agentReplayContend = null;
} }
return result; return result;
@@ -156,16 +156,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// 构造结果 // 构造结果
OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
currentTime currentTime
); );
CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息 // 调用后台接收处理所有的Replay信息
asyncWaitOMResultService.waitFor(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend);
/* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( /* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
result, result,
@@ -185,10 +185,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 超时,或者 全部信息已经收集 // 超时,或者 全部信息已经收集
// 此处调用,即可中断 异步任务的收集工作 // 此处调用,即可中断 异步任务的收集工作
asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend);
// 处理结果 // 处理结果
OMAsyncReplayContend OctopusMessageAsyncReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.forEach( .forEach(
@@ -216,7 +216,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
// help gc // help gc
OMAsyncReplayContend = null; OctopusMessageAsyncReplayContend = null;
} }
return result; return result;

View File

@@ -76,7 +76,7 @@ public class StatusController {
public R<Map<String, List<String>>> ManualUpdateAgentStatus() { public R<Map<String, List<String>>> ManualUpdateAgentStatus() {
// 手动调用更新 // 手动调用更新
agentStatusCacheService.updateAgentStatusMapCache(); agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap);
return R.ok(STATUS_AGENT_LIST_MAP); return R.ok(STATUS_AGENT_LIST_MAP);
} }

View File

@@ -2,8 +2,8 @@ package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOMResultService; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OMAsyncReplayContend; import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -24,7 +24,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR; private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource @Resource
AsyncWaitOMResultService asyncWaitOMResultService; AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Resource @Resource
SyncExecutionService asyncExecutionService; SyncExecutionService asyncExecutionService;
@@ -205,15 +205,15 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
} }
// 构造回复信息的内容 // 构造回复信息的内容
OMAsyncReplayContend OMAsyncReplayContend = OMAsyncReplayContend.build( OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = OctopusMessageAsyncReplayContend.build(
commandCount, commandCount,
CurrentAppOctopusMessageType, CurrentAppOctopusMessageType,
initTime initTime
); );
CountDownLatch countDownLatch = OMAsyncReplayContend.getCountDownLatch(); CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch();
// 开始等待结果 // 开始等待结果
asyncWaitOMResultService.waitFor(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend);
// 监听结果 // 监听结果
try { try {
@@ -228,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
// 等待所有的结果返回 // 等待所有的结果返回
// 停止等待结果 // 停止等待结果
asyncWaitOMResultService.stopWaiting(OMAsyncReplayContend); asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend);
// 解析结果 // 解析结果
OMAsyncReplayContend OctopusMessageAsyncReplayContend
.getReplayOMList() .getReplayOMList()
.stream() .stream()
.map( .map(

View File

@@ -7,7 +7,6 @@ import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService; import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@@ -15,7 +14,6 @@ import javax.annotation.Resource;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
/** /**
@@ -50,9 +48,9 @@ public class AgentStatusCacheService {
/** /**
* 存储所有Agent状态的Map * 存储所有Agent状态的Map
* <p> * <p>
* 内容为 agentTopicName-健康状态 * 内容为 agentTopicName- True代表健康 False代表不健康
*/ */
public static final Map<String, String> ALL_AGENT_STATUS_MAP = new HashMap<>(); public static final Map<String, Boolean> ALL_AGENT_STATUS_MAP = new HashMap<>();
/** /**
* 保存所有健康运行的Agent Topic Name * 保存所有健康运行的Agent Topic Name
@@ -67,8 +65,6 @@ public class AgentStatusCacheService {
@Resource @Resource
CoreServerService coreServerService; CoreServerService coreServerService;
@Resource
RedisTemplate redisTemplate;
@PostConstruct @PostConstruct
public void GenerateAllCache() { public void GenerateAllCache() {
@@ -77,7 +73,7 @@ public class AgentStatusCacheService {
updateAllAgentTopicNameCache(); updateAllAgentTopicNameCache();
// Agent状态信息的两个Map // Agent状态信息的两个Map
updateAgentStatusMapCache(); // updateAgentStatusMapCache(agentAliveStatusMap);
} }
@@ -118,7 +114,7 @@ public class AgentStatusCacheService {
* 由定时任务或者初始化服务触发 * 由定时任务或者初始化服务触发
* 2023-02-21 前端接口,手动更新 * 2023-02-21 前端接口,手动更新
*/ */
public void updateAgentStatusMapCache() { public void updateAgentStatusMapCache(Map<String, Boolean> agentAliveStatusMap) {
// 检查,排除没有节点的情况 // 检查,排除没有节点的情况
if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) {
@@ -126,47 +122,17 @@ public class AgentStatusCacheService {
return; return;
} }
// 从redis中获取所有节点的当前状态
List statusList = redisTemplate
.opsForHash()
.multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPIC_NAME_LIST
);
// 初始话 还没有状态的情况直接return // 2023年6月15日 更新状态缓存
if (CollectionUtils.isEmpty(statusList)) {
log.warn("agent status from redis is empty !");
return;
}
// 增加更新时间 2023年2月21日
String timeString = TimeUtils.currentTimeString();
// 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown)
HashMap<String, String> agentStatusMap = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) {
agentStatusMap.put(
ALL_AGENT_TOPIC_NAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i)))
);
}
// 2023-01-16
ALL_AGENT_STATUS_MAP.clear(); ALL_AGENT_STATUS_MAP.clear();
ALL_AGENT_STATUS_MAP.putAll(agentStatusMap); ALL_AGENT_STATUS_MAP.putAll(agentAliveStatusMap);
ALL_AGENT_STATUS_MAP.put(
STATUS_UPDATE_TIME_KEY,
timeString
);
// 2023-01-16 // 2023-01-16
// 更新 状态-Agent容器 内容为 // 更新 状态-Agent容器 内容为
// HEALTHY -> ["agentTopicName-1" "agentTopicName-2"] // HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
// FAILED -> ["agentTopicName-1" "agentTopicName-2"] // FAILED -> ["agentTopicName-1" "agentTopicName-2"]
Map<String, List<String>> statusAgentListMap = agentStatusMap Map<String, List<String>> statusAgentListMap = agentAliveStatusMap
.entrySet() .entrySet()
.stream() .stream()
.collect( .collect(
@@ -178,7 +144,7 @@ public class AgentStatusCacheService {
.stream() .stream()
.collect( .collect(
Collectors.toMap( Collectors.toMap(
entry -> entry.getKey(), entry -> entry.getKey() ? "HEALTHY" : "FAILED",
entry -> entry entry -> entry
.getValue() .getValue()
.stream() .stream()
@@ -192,7 +158,9 @@ public class AgentStatusCacheService {
// 2023-2-3 bug fix // 2023-2-3 bug fix
STATUS_AGENT_LIST_MAP.clear(); STATUS_AGENT_LIST_MAP.clear();
STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap);
// 2023年2月21日更新时间 // 2023年2月21日更新时间
String timeString = TimeUtils.currentFormatTimeString();
STATUS_AGENT_LIST_MAP.put( STATUS_AGENT_LIST_MAP.put(
STATUS_UPDATE_TIME_KEY, STATUS_UPDATE_TIME_KEY,
Collections.singletonList(timeString) Collections.singletonList(timeString)
@@ -200,22 +168,26 @@ public class AgentStatusCacheService {
log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了"); log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了");
// Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) {
if (null !=statusList.get(i) && statusList
.get(i)
.equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i));
}
}
// 缓存相应的存活Agent // 缓存相应的存活Agent
List<String> allHealthyAgentTopicNames = agentAliveStatusMap
.entrySet()
.stream()
.filter(
entry -> entry
.getKey()
.equals(Boolean.TRUE)
)
.map(
Map.Entry::getKey
)
.collect(Collectors.toList());
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear(); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear();
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames);
// help gc // help gc
agentStatusMap = null; agentAliveStatusMap = null;
statusAgentListMap = null; statusAgentListMap = null;
allHealthyAgentTopicNames = null; allHealthyAgentTopicNames = null;
} }

View File

@@ -20,31 +20,37 @@ import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESS
*/ */
@Service @Service
@Slf4j @Slf4j
public class AsyncWaitOMResultService { public class AsyncWaitOctopusMessageResultService {
/** /**
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
* KEY -> replayMatchKey * KEY -> replayMatchKey
* VALUE -> OMAsyncReplayContend - 包含countDownLatch result * VALUE -> OctopusMessageAsyncReplayContend - 包含countDownLatch result
*/ */
private static final HashMap<String, OMAsyncReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); private static final HashMap<String, OctopusMessageAsyncReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
public void waitFor(OMAsyncReplayContend OMAsyncReplayContend) { public void waitFor(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) {
// REPLAY_CACHE_MAP中写入 Key // REPLAY_CACHE_MAP中写入 Key
OM_REPLAY_WAITING_TARGET_MAP.put( OM_REPLAY_WAITING_TARGET_MAP.put(
OMAsyncReplayContend.getReplayMatchKey(), OctopusMessageAsyncReplayContend.getReplayMatchKey(),
OMAsyncReplayContend OctopusMessageAsyncReplayContend
); );
// 在调用线程的countDownLunch结束之后,关闭 // 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列 // 清除 REPLAY_CACHE_MAP 中的队列
} }
public void stopWaiting(OMAsyncReplayContend OMAsyncReplayContend) { public void stopWaiting(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
OM_REPLAY_WAITING_TARGET_MAP.remove(OMAsyncReplayContend.getReplayMatchKey()); OctopusMessageAsyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageAsyncReplayContend.getReplayMatchKey());
// 移除该内容
OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageAsyncReplayContend.getReplayMatchKey());
// help gc
contend = null;
} }
@@ -82,7 +88,7 @@ public class AsyncWaitOMResultService {
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 构造 replayMatchKey // 构造 replayMatchKey
String matchKey = OMAsyncReplayContend.generateMatchKey( String matchKey = OctopusMessageAsyncReplayContend.generateMatchKey(
replayOMessage.getType(), replayOMessage.getType(),
replayOMessage.getInit_time() replayOMessage.getInit_time()
); );
@@ -99,11 +105,12 @@ public class AsyncWaitOMResultService {
} }
// Map中包含有Key,那么放置进去 // Map中包含有Key,那么放置进去
OMAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); OctopusMessageAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
replayContend replayContend
.getReplayOMList() .getReplayOMList()
.add(replayOMessage); .add(replayOMessage);
// 需要操作countDown // 需要操作countDown
replayContend replayContend
.getCountDownLatch() .getCountDownLatch()

View File

@@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch;
@NoArgsConstructor @NoArgsConstructor
@SuperBuilder(toBuilder = true) @SuperBuilder(toBuilder = true)
@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的")
public class OMAsyncReplayContend { public class OctopusMessageAsyncReplayContend {
@ApiModelProperty("rpc消息的类型") @ApiModelProperty("rpc消息的类型")
OctopusMessageType type; OctopusMessageType type;
@@ -37,17 +37,6 @@ public class OMAsyncReplayContend {
@ApiModelProperty("回复的结果列表, 临时保存") @ApiModelProperty("回复的结果列表, 临时保存")
ArrayList<OctopusMessage> replayOMList; ArrayList<OctopusMessage> replayOMList;
protected static String generateMatchKey(OMAsyncReplayContend replayIdentifier) {
String relayMatchKey = replayIdentifier
.getType()
.toString() + replayIdentifier
.getInitTime()
.toString();
return relayMatchKey;
}
/** /**
* @param messageType * @param messageType
* @param messageInitTime 必须使用 TimeUtils.currentFormatTime(); * @param messageInitTime 必须使用 TimeUtils.currentFormatTime();
@@ -61,21 +50,26 @@ public class OMAsyncReplayContend {
} }
/** /**
* 方便使用的一个构造方法 * Execution模块使用的模板
* *
* @return * @return
*/ */
public static OMAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { public static OctopusMessageAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
return new OMAsyncReplayContend( CountDownLatch latch = null;
if (waitForReplayNum != 0) {
latch = new CountDownLatch(waitForReplayNum);
}
return new OctopusMessageAsyncReplayContend(
currentOMType, currentOMType,
currentTime, currentTime,
generateMatchKey( generateMatchKey(
currentOMType, currentOMType,
currentTime currentTime
), ),
new CountDownLatch(waitForReplayNum), latch,
new ArrayList<>() new ArrayList<OctopusMessage>(16)
); );
} }

View File

@@ -1,17 +1,17 @@
package io.wdd.rpc.scheduler.job; package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.config.QuartzLogOperator; import io.wdd.rpc.scheduler.config.QuartzLogOperator;
import io.wdd.rpc.scheduler.service.status.CheckAgentAliveStatus; import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource; import javax.annotation.Resource;
public class AgentStatusMonitorJob extends QuartzJobBean { public class AgentAliveStatusMonitorJob extends QuartzJobBean {
@Resource @Resource
CheckAgentAliveStatus checkAgentAliveStatus; AgentAliveStatusMonitorService agentAliveStatusMonitorService;
@Resource @Resource
QuartzLogOperator quartzLogOperator; QuartzLogOperator quartzLogOperator;
@@ -23,7 +23,7 @@ public class AgentStatusMonitorJob extends QuartzJobBean {
//JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
// actually execute the monitor service // actually execute the monitor service
checkAgentAliveStatus.go(); agentAliveStatusMonitorService.go();
// log to somewhere // log to somewhere
quartzLogOperator.save(); quartzLogOperator.save();

View File

@@ -1,8 +1,8 @@
package io.wdd.rpc.scheduler.service; package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob;
import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob;
import io.wdd.rpc.scheduler.job.AgentStatusMonitorJob;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.quartz.CronExpression; import org.quartz.CronExpression;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@@ -103,7 +103,7 @@ public class BuildStatusScheduleTask {
// build the Job // build the Job
octopusQuartzService.addMission( octopusQuartzService.addMission(
AgentStatusMonitorJob.class, AgentAliveStatusMonitorJob.class,
"monitorAllAgentStatusJob", "monitorAllAgentStatusJob",
JOB_GROUP_NAME, JOB_GROUP_NAME,
healthyCheckStartDelaySeconds, healthyCheckStartDelaySeconds,

View File

@@ -4,6 +4,7 @@ import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.init.AgentStatusCacheService; import io.wdd.rpc.init.AgentStatusCacheService;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.status.OctopusStatusMessage; import io.wdd.rpc.status.OctopusStatusMessage;
import io.wdd.rpc.status.service.AsyncStatusService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
@@ -13,8 +14,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
@@ -38,7 +38,7 @@ import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE
@Service @Service
@Slf4j @Slf4j
@Lazy @Lazy
public class CheckAgentAliveStatus { public class AgentAliveStatusMonitorService {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource @Resource
@@ -52,11 +52,14 @@ public class CheckAgentAliveStatus {
@Resource @Resource
BuildStatusScheduleTask buildStatusScheduleTask; BuildStatusScheduleTask buildStatusScheduleTask;
@Resource
AsyncStatusService asyncStatusService;
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP; private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
public void go() { public void go() {
try {
// 1. 获取所有注册的Agent 手动更新 // 1. 获取所有注册的Agent 手动更新
agentStatusCacheService.updateAllAgentTopicNameCache(); agentStatusCacheService.updateAllAgentTopicNameCache();
if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) {
@@ -69,21 +72,15 @@ public class CheckAgentAliveStatus {
// 2.发送状态检查信息 agent需要update相应的HashMap的值 // 2.发送状态检查信息 agent需要update相应的HashMap的值
// 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改 // 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改
CountDownLatch aliveStatusCDL = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size());
// 使用同步更新的策略
Map<String, Boolean> agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus(
ALL_AGENT_TOPIC_NAME_LIST,
5
);
buildAndSendAgentHealthMessage(); // 更新Agent的状态
updateAllAgentHealthyStatus(agentAliveStatusMap);
// 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 等待agent的状态上报
TimeUnit.SECONDS.sleep(MAX_WAIT_AGENT_REPORT_STATUS_TIME);
// 4.检查相应的 状态HashMap然后全部置为零
// todo 存储到某个地方目前只是打印日志
updateAllAgentHealthyStatus();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
private void checkOrCreateRedisHealthyKey() { private void checkOrCreateRedisHealthyKey() {
@@ -129,8 +126,7 @@ public class CheckAgentAliveStatus {
.map( .map(
agentTopicName -> OctopusStatusMessage agentTopicName -> OctopusStatusMessage
.builder() .builder()
.agentTopicName(agentTopicName) .statusType(HEALTHY_STATUS_MESSAGE_TYPE)
.type(HEALTHY_STATUS_MESSAGE_TYPE)
.build() .build()
) )
.collect(Collectors.toList()); .collect(Collectors.toList());
@@ -139,15 +135,15 @@ public class CheckAgentAliveStatus {
collectAgentStatus.statusMessageToAgent(collect); collectAgentStatus.statusMessageToAgent(collect);
} }
private void updateAllAgentHealthyStatus() { private void updateAllAgentHealthyStatus(Map<String, Boolean> agentAliveStatusMap) {
String currentTimeString = TimeUtils.currentTimeString(); String currentTimeString = TimeUtils.currentTimeString();
// 更新所有的缓存状态 // 更新所有的缓存状态
agentStatusCacheService.updateAgentStatusMapCache(); agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap);
// 执行Metric上报定时任务 // 执行Metric上报定时任务
buildStatusScheduleTask.buildAgentMetricScheduleTask(); // buildStatusScheduleTask.buildAgentMetricScheduleTask();
// 这里仅仅是更新时间 // 这里仅仅是更新时间
redisTemplate redisTemplate
@@ -158,6 +154,14 @@ public class CheckAgentAliveStatus {
currentTimeString currentTimeString
); );
// 更新所有的Agent状态
redisTemplate
.opsForHash()
.putAll(
ALL_AGENT_STATUS_REDIS_KEY,
agentAliveStatusMap
);
} }

View File

@@ -52,7 +52,7 @@ public class AgentRuntimeMetricStatus {
agentTopicName -> { agentTopicName -> {
return OctopusStatusMessage return OctopusStatusMessage
.builder() .builder()
.type(METRIC_STATUS_MESSAGE_TYPE) .statusType(METRIC_STATUS_MESSAGE_TYPE)
.metricRepeatCount(metricRepeatCount) .metricRepeatCount(metricRepeatCount)
.metricRepeatPinch(metricRepeatPinch) .metricRepeatPinch(metricRepeatPinch)
.agentTopicName(agentTopicName) .agentTopicName(agentTopicName)

View File

@@ -14,20 +14,18 @@ public class OctopusStatusMessage {
// below two will be used by both server and agent // below two will be used by both server and agent
// 存储所有Agent的实时健康状态 1代表健康 0代表失败 // 存储所有Agent的实时健康状态 1代表健康 0代表失败
public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_HEALTHY_STATUS"; public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_HEALTHY_STATUS";
public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; public static final String HEALTHY_STATUS_MESSAGE_TYPE = "PING";
public static final String ALL_STATUS_MESSAGE_TYPE = "all"; public static final String ALL_STATUS_MESSAGE_TYPE = "ALL";
public static final String METRIC_STATUS_MESSAGE_TYPE = "metric"; public static final String METRIC_STATUS_MESSAGE_TYPE = "METRIC";
public static final String APP_STATUS_MESSAGE_TYPE = "app"; public static final String APP_STATUS_MESSAGE_TYPE = "APP";
/** /**
* which kind of status should be return * which kind of status should be return
* metric => short time message * metric => short time message
* all => all agent status message * all => all agent status message
* healthy => check for healthy * healthy => check for healthy
* */ */
String type; String statusType;
String agentTopicName;
int metricRepeatCount; int metricRepeatCount;

View File

@@ -0,0 +1,16 @@
package io.wdd.rpc.status.service;
import java.util.List;
import java.util.Map;
public interface AsyncStatusService {
/**
* 应该是同步收集 agentTopicNameList 的节点的存活状态,并返回所有的状态存活结果
*
* @param agentTopicNameList
* @param aliveStatusWaitMaxTime
* @return
*/
Map<String, Boolean> AsyncCollectAgentAliveStatus(List<String> agentTopicNameList, int aliveStatusWaitMaxTime);
}

View File

@@ -0,0 +1,146 @@
package io.wdd.rpc.status.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.OctopusStatusMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
@Slf4j
@Service
public class AsyncStatusServiceImpl implements AsyncStatusService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS;
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Override
public Map<String, Boolean> AsyncCollectAgentAliveStatus(List<String> agentTopicNameList, int aliveStatusWaitMaxTime) {
// 构造最后的结果Map
Map<String, Boolean> agentAliveStatusMap = agentTopicNameList
.stream()
.collect(
Collectors.toMap(
agentTopicName -> agentTopicName,
agentTopicName -> Boolean.FALSE
));
LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 构造OctopusMessage - StatusMessage结构体, 下发所有的消息
buildAndSendAgentAliveOctopusMessage(currentTime);
// 异步收集消息
OctopusMessageAsyncReplayContend statusAsyncReplayContend = OctopusMessageAsyncReplayContend.build(
agentTopicNameList.size(),
CurrentAppOctopusMessageType,
currentTime
);
asyncWaitOctopusMessageResultService.waitFor(statusAsyncReplayContend);
// 解析结果
CountDownLatch countDownLatch = statusAsyncReplayContend.getCountDownLatch();
// 等待状态返回的结果
boolean agentAliveStatusCollectResult = false;
try {
agentAliveStatusCollectResult = countDownLatch.await(
aliveStatusWaitMaxTime,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (!agentAliveStatusCollectResult) {
log.debug("Agent存活状态检查没有检查到全部的Agent");
}
// 移除等待队列
asyncWaitOctopusMessageResultService.stopWaiting(statusAsyncReplayContend);
// 处理结果
statusAsyncReplayContend
.getReplayOMList()
.stream()
.forEach(
statusOMessage -> {
if (statusOMessage.getResult() != null) {
agentAliveStatusMap.put(
statusOMessage.getUuid(),
Boolean.TRUE
);
}
}
);
}
// 返回Agent的存活状态内容
return agentAliveStatusMap;
}
private void buildAndSendAgentAliveOctopusMessage(LocalDateTime currentTime) {
List<OctopusMessage> octopusStatusMessageList = ALL_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> ConstructAgentStatusMessage(
HEALTHY_STATUS_MESSAGE_TYPE,
agentTopicName,
currentTime
)
)
.collect(Collectors.toList());
// 发送信息
oMessageToAgentSender.send(octopusStatusMessageList);
}
private OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) {
OctopusStatusMessage statusMessage = OctopusStatusMessage
.builder()
.statusType(statusType)
.build();
String ops;
try {
ops = objectMapper.writeValueAsString(statusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(CurrentAppOctopusMessageType)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)
.build();
}
}