package io.wdd.rpc.agent; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.agent.AgentOperationMessage; import io.wdd.common.beans.agent.AgentOperationType; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; @Service @Slf4j public class OctopusAgentServiceImpl implements OctopusAgentService { private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT; @Resource OMessageToAgentSender oMessageToAgentSender; @Resource ObjectMapper objectMapper; @Resource RedisTemplate redisTemplate; @Override public Map getAllAgentVersion() { HashMap result = new HashMap<>(); // 查询获取到最新的Agent版本信息 result.put( "LATEST_VERSION", getRealAgentLatestVersion() ); // 获取Agent的版本信息 -- 超时时间 // 发送OctopusMessage-Agent // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); buildAndSendToAllAgent( AgentOperationType.VERSION, currentTime ); CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); // todo 此处存在重大bug,会导致CPU占用飙升 CompletableFuture getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo( result, currentTime, countDownLatch ); try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 countDownLatch.await( 5, TimeUnit.SECONDS ); } catch (InterruptedException e) { log.warn("存在部分Agent没有上报 版本信息!"); } finally { // 必须关闭释放线程 getAllAgentVersionInfoFuture.cancel(true); } return result; } private String getRealAgentLatestVersion() { String latestVersion = (String) redisTemplate .opsForValue() .get( AGENT_LATEST_VERSION ); return latestVersion; } @Override public Map getAllAgentCoreInfo() { HashMap result = new HashMap<>(); // 获取Agent的版本信息 -- 超时时间 // 发送OctopusMessage-Agent // 从OctopusToServer中收集到所有Agent的版本信息 // 组装信息至集合中 LocalDateTime currentTime = TimeUtils.currentFormatTime(); buildAndSendToAllAgent( AgentOperationType.INFO, currentTime ); CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, currentTime, countDownLatch ); try { // 超时等待5秒钟, 或者所有的Agent均已经完成上报 countDownLatch.await( 5, TimeUnit.SECONDS ); } catch (InterruptedException e) { log.warn("存在部分Agent没有上报 核心信息!"); } finally { // 必须关闭释放线程 getAllAgentCoreInfoFuture.cancel(true); } return result; } private CompletableFuture waitCollectAllAgentCoreInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { ExecutorService pool = ServerCommonPool.pool; // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 return CompletableFuture.runAsync( () -> { while (true) { if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { // 开始收集等待 200ms try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } // 返回,继续死循环 continue; } OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 获取到OM // 判断信息是否是需要的类型 // 根据 init_time + OctopusMessageType + AgentOperationMessage if (!judgyIsCurrentServerReplyMessage( message, startTime )) { // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 try { OCTOPUS_MESSAGE_FROM_AGENT.put(message); } catch (InterruptedException e) { throw new RuntimeException(e); } // 返回,继续死循环 continue; } // 是当前需要的消息信息 try { // 解析当前信息 ServerInfoVO serverInfoVO = objectMapper.readValue( (String) message.getResult(), new TypeReference() { } ); result.put( message.getUuid(), serverInfoVO ); } catch (JsonProcessingException e) { throw new RuntimeException(e); } countDownLatch.countDown(); } }, pool ); } @Override public String shutdownAgentDanger(String agentTopicName) { if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { log.error("agentTopicName Error !"); return "agentTopicName Error!"; } LocalDateTime formatTime = TimeUtils.currentFormatTime(); buildAndSendToAllAgent( AgentOperationType.SHUTDOWN, formatTime ); // 是否需要检查相应的状态 return null; } private CompletableFuture waitCollectAllAgentVersionInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { ExecutorService pool = ServerCommonPool.pool; // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 return CompletableFuture.runAsync( () -> { while (true) { if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { // 开始收集等待 200ms try { TimeUnit.MILLISECONDS.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } // 返回,继续死循环 continue; } OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 获取到OM // 判断信息是否是需要的类型 // 根据 init_time + OctopusMessageType + AgentOperationMessage if (!judgyIsCurrentServerReplyMessage( message, startTime )) { // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 try { OCTOPUS_MESSAGE_FROM_AGENT.put(message); } catch (InterruptedException e) { throw new RuntimeException(e); } // 返回,继续死循环 continue; } // 是当前需要的消息信息 result.put( message.getUuid(), (String) message.getResult() ); countDownLatch.countDown(); } }, pool ); } /** * 判断信息是否是需要的类型 * 根据 init_time + OctopusMessageType + AgentOperationMessage * * @return */ private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) { // init_time 时间判断 boolean startTimeEqual = startTime.isEqual(message.getInit_time()); // OctopusMessageType判断 boolean OMTypeEqual = message .getType() .equals(CurrentAppOctopusMessageType); return startTimeEqual && OMTypeEqual; } private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) { List octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST .stream() .map( agentTopicName -> ConstructAgentOperationMessage( agentTopicName, operationType, currentTime ) ) .collect(Collectors.toList()); // 发送相应的消息 oMessageToAgentSender.send(octopusMessageList); } /** * 专门构造 Agent 类型的 OctopusMessage * 通常只能在此类中使用 * * @param agentTopicName * @param operationType * @param currentTime * @return */ private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) { AgentOperationMessage operationMessage = AgentOperationMessage .builder() .opType(operationType) .build(); String ops; try { ops = objectMapper.writeValueAsString(operationMessage); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return OctopusMessage .builder() .type(CurrentAppOctopusMessageType) .uuid(agentTopicName) .init_time(currentTime) .content(ops) .build(); } }