package io.wdd.rpc.status; import io.wdd.common.utils.TimeUtils; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; /** * Server启动或者运行的时候,需要初 缓存一系列的信息 *

* 所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET *

* 2023年7月10日 此部分应该初始化全部为 False状态 * Agent状态信息的两个Map STATUS_AGENT_LIST_MAP ALL_AGENT_STATUS_MAP *

* 2023年7月10日 -- 此部分作为Redis中存储的 二级缓存部分 应该严格遵循 */ @Service @Slf4j public class CommonAndStatusCache { /** * 存储所有的AgentTopicName的缓存 */ public static final Set ALL_AGENT_TOPIC_NAME_SET = new HashSet<>(); /** * 存储所有的AgentTopicName的缓存 */ public static final List ALL_AGENT_TOPIC_NAME_LIST = new ArrayList<>(); /** * 存储 状态对应Agent列表的Map * Agent的状态描述为 AgentHealthyStatusEnum * HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] * FAILED -> ["agentTopicName-1", "agentTopicName-2"] */ public static final Map> STATUS_AGENT_LIST_MAP = new HashMap<>(); /** * 存储所有Agent状态的Map *

* 内容为 agentTopicName- True代表健康 False代表不健康 */ public static final Map ALL_AGENT_STATUS_MAP = new HashMap<>(); /** * 保存所有健康运行的Agent Topic Name */ public static final List ALL_HEALTHY_AGENT_TOPIC_NAME_LIST = new ArrayList<>(); /** * 记录状态信息缓存的更新时间 */ public static final String STATUS_UPDATE_TIME_KEY = "UPDATE_TIME"; /** * 记录状态信息缓存的初始化时间 */ public static final String STATUS_INIT_TIME_KEY = "INIT_TIME"; /** * AgentTopicName 在Redis中緩存的Key */ private static final String ALL_AGENT_TOPIC_NAME_REDIS_KEY = "ALL_AGENT_TOPIC_NAME"; @Resource CoreServerService coreServerService; @Resource RedisTemplate redisTemplate; @PostConstruct public void InitToGenerateAllStatusCache() { //所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET updateAllAgentTopicNameCache(); // Agent状态信息的两个Map // 初始化 默认创建全部失败的Map Map initAgentFalseStatusMap = ALL_AGENT_TOPIC_NAME_LIST .stream() .collect(Collectors.toMap( topicName -> topicName, topicName -> Boolean.FALSE )); updateAgentStatusCache(initAgentFalseStatusMap); } /** * 从数据库中获取所有注册过的Agent名称 *

* 2023年7月10日 写入Redis中保存一份 */ public void updateAllAgentTopicNameCache() { //查询DB List allAgentInfo = coreServerService.serverGetAll(); if (CollectionUtils.isEmpty(allAgentInfo)) { log.warn("[Serer Boot Up] Octopus Serer First Boot Up ! No Agent Registered Ever!"); return; } ALL_AGENT_TOPIC_NAME_LIST.clear(); ALL_AGENT_TOPIC_NAME_SET.clear(); List collect = allAgentInfo .stream() .map(ServerInfoVO::getTopicName) .collect(Collectors.toList()); ALL_AGENT_TOPIC_NAME_LIST.addAll(collect); ALL_AGENT_TOPIC_NAME_SET.addAll(collect); String[] all_agent_topic_name_array = new String[ALL_AGENT_TOPIC_NAME_LIST.size()]; ALL_AGENT_TOPIC_NAME_LIST.toArray(all_agent_topic_name_array); // 2023年7月10日 同步缓存至Redis中 redisTemplate .opsForSet() .add( ALL_AGENT_TOPIC_NAME_REDIS_KEY, all_agent_topic_name_array ); } /** * 根据传入的状态Map更新二级缓存的两个状态Map和健康主机的列表 * ALL_AGENT_STATUS_MAP * STATUS_AGENT_LIST_MAP *

* ALL_HEALTHY_AGENT_TOPIC_NAME_LIST */ public void updateAgentStatusCache(Map agentAliveStatusMap) { // 检查,排除没有节点的情况 if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { log.warn("[Agent Status Cache] No Agent Registered Ever! Return"); return; } // 2023年6月15日 更新状态缓存 ALL_AGENT_STATUS_MAP.clear(); ALL_AGENT_STATUS_MAP.putAll(agentAliveStatusMap); // 2023-01-16 // 更新 状态-Agent容器 内容为 // HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] // FAILED -> ["agentTopicName-1", "agentTopicName-2"] Map> statusAgentListMap = agentAliveStatusMap .entrySet() .stream() .collect( Collectors.groupingBy( Map.Entry::getValue ) ) .entrySet() .stream() .collect( Collectors.toMap( entry -> entry.getKey() ? "HEALTHY" : "FAILED", entry -> entry .getValue() .stream() .map( Map.Entry::getKey ) .collect(Collectors.toList()) ) ); // 2023-2-3 bug fix STATUS_AGENT_LIST_MAP.clear(); STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap); // 2023年2月21日,更新时间 String timeString = TimeUtils.currentFormatTimeString(); STATUS_AGENT_LIST_MAP.put( STATUS_UPDATE_TIME_KEY, Collections.singletonList(timeString) ); // 缓存相应的存活Agent List allHealthyAgentTopicNames = agentAliveStatusMap .entrySet() .stream() .filter( entry -> entry .getValue() .equals(Boolean.TRUE) ) .map( Map.Entry::getKey ) .collect(Collectors.toList()); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear(); ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames); log.debug( "[状态二级缓存] - ALL_HEALTHY_AGENT_TOPIC_NAME_LIST 为=> {},\n STATUS_AGENT_LIST_MAP 为=> {},\n ALL_AGENT_STATUS_MAP 为=> {}\n", ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, STATUS_AGENT_LIST_MAP, ALL_AGENT_STATUS_MAP ); // help gc agentAliveStatusMap = null; statusAgentListMap = null; allHealthyAgentTopicNames = null; } private String uniformHealthyStatus(String agentStatus) { switch (agentStatus) { case "0": return AgentHealthyStatusEnum.FAILED.getStatus(); case "1": return AgentHealthyStatusEnum.HEALTHY.getStatus(); default: return AgentHealthyStatusEnum.UNKNOWN.getStatus(); } } }