diff --git a/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java b/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java index e5d65e0..c20e6ec 100644 --- a/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java +++ b/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java @@ -21,6 +21,13 @@ public class AgentOperationInfoService { public void AgentOpInfo(OctopusMessage octopusMessage){ + // 构造结果OM + octopusMessage.setAc_time(TimeUtils.currentTime()); + octopusMessage.setResult(agentServerInfo); + + // 发送相应的OM至 OctopusToServer 中 + oMessageToServerSender.send(octopusMessage); + } public void AgentOpVersion(OctopusMessage octopusMessage){ diff --git a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java index 21e29b5..4806b74 100644 --- a/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java +++ b/agent/src/main/java/io/wdd/agent/agent/AgentRebootUpdateService.java @@ -1,12 +1,26 @@ package io.wdd.agent.agent; +import io.wdd.agent.config.utils.AgentCommonThreadPool; +import io.wdd.agent.executor.CommandExecutor; import io.wdd.agent.executor.FunctionExecutor; +import io.wdd.agent.message.OMessageToServerSender; import io.wdd.common.beans.agent.AgentOperationMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; @Service @Slf4j @@ -15,6 +29,15 @@ public class AgentRebootUpdateService { @Resource FunctionExecutor functionExecutor; + @Resource + CommandExecutor commandExecutor; + + @Resource + OMessageToServerSender toServerSender; + + @Resource + RedisTemplate redisTemplate; + public void exAgentReboot(AgentOperationMessage operationMessage) { } @@ -22,4 +45,91 @@ public class AgentRebootUpdateService { public void exAgentUpdate(AgentOperationMessage operationMessage) { } + + public void exAgentShutdown(OctopusMessage octopusMessage, AgentOperationMessage operationMessage) { + + LocalDateTime now = TimeUtils.currentTime(); + LocalDateTime operationTime = operationMessage.getOperationTime(); + Duration duration = Duration.between( + now, + operationTime + ); + + long seconds = duration.toSeconds(); + + // 发送消息至RabbitMQ中 + octopusMessage.setAc_time(now); + octopusMessage.setResult( + String.format( + "OctopusAgent [ %s ] 将会在 [ %s ] 秒之后关机", + octopusMessage.getUuid(), + seconds + ) + ); + toServerSender.send( + octopusMessage + ); + + ExecutorService pool = AgentCommonThreadPool.pool; + + // 休眠,阻塞当前线程 + if (seconds > 0) { + + pool.submit( + () -> { + + // keep this to local + OctopusMessage message = octopusMessage; + + try { + TimeUnit.SECONDS.sleep(seconds); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // 执行实际的关机操作 + doShutdownAgent(message); + } + ); + + } else { + // 不需要等待,直接关机 + doShutdownAgent(octopusMessage); + } + + + } + + private void doShutdownAgent(OctopusMessage octopusMessage) { + + ArrayList shutdownCommand = new ArrayList<>( + List.of( + "systemctl", + "poweroff" + ) + ); + + //发送消息至Redis中, 修改Agent的状态 + redisTemplate + .opsForHash() + .put( + ALL_AGENT_STATUS_REDIS_KEY, + octopusMessage.getUuid(), + "0" + ); + + + // 执行关机操作 + log.error( + "开始关闭OctopusAgent! 时间为 [ {} ]", + TimeUtils.currentTimeString() + ); + String streamKey = octopusMessage.getUuid() + "-Status"; + + // 最终执行关机操作 + commandExecutor.execute( + streamKey, + shutdownCommand + ); + } } diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java index 49e9acc..6698669 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java @@ -66,6 +66,10 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler { // 收集Agent的核心Info信息 agentOperationInfoService.AgentOpInfo(octopusMessage); + } else if (opType.equals(AgentOperationType.SHUTDOWN)) { + + // 关闭Agent的接口 + agentRebootUpdateService.exAgentShutdown(octopusMessage, operationMessage); } else { // operation unknown log.error("Command Agent Operation Unknown! "); diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java index c2f6e53..a196385 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java @@ -29,9 +29,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { @Resource AgentStatusCollector agentStatusCollector; - - - @Override public boolean handle(OctopusMessage octopusMessage) { diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index 2e1d428..b5da215 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -75,7 +75,7 @@ public class CommandExecutor { } - public int processExecute(String streamKey, ProcessBuilder processBuilder) { + private int processExecute(String streamKey, ProcessBuilder processBuilder) { processBuilder.redirectErrorStream(true); // processBuilder.inheritIO(); diff --git a/common/src/main/java/io/wdd/common/beans/agent/AgentOperationMessage.java b/common/src/main/java/io/wdd/common/beans/agent/AgentOperationMessage.java index 3ecb7ff..787dc54 100644 --- a/common/src/main/java/io/wdd/common/beans/agent/AgentOperationMessage.java +++ b/common/src/main/java/io/wdd/common/beans/agent/AgentOperationMessage.java @@ -23,7 +23,6 @@ public class AgentOperationMessage { * 需要执行的目标时间, */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") - @Deprecated private LocalDateTime operationTime; diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 0b766fd..3173ffa 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -1,6 +1,7 @@ package io.wdd.rpc.agent; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.agent.AgentOperationMessage; import io.wdd.common.beans.agent.AgentOperationType; @@ -25,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; @@ -100,12 +102,125 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { @Override public Map getAllAgentCoreInfo() { - return null; + + HashMap result = new HashMap<>(); + + // 获取Agent的版本信息 -- 超时时间 + // 发送OctopusMessage-Agent + // 从OctopusToServer中收集到所有Agent的版本信息 + // 组装信息至集合中 + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + + buildAndSendToAllAgent( + AgentOperationType.INFO, + currentTime + ); + + CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); + + CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( + result, + currentTime, + countDownLatch + ); + + try { + // 超时等待5秒钟, 或者所有的Agent均已经完成上报 + countDownLatch.await( + 5, + TimeUnit.SECONDS + ); + } catch (InterruptedException e) { + log.warn("存在部分Agent没有上报 核心信息!"); + } finally { + // 必须关闭释放线程 + getAllAgentCoreInfoFuture.cancel(true); + } + + return result; + } + + private CompletableFuture waitCollectAllAgentCoreInfo(HashMap result, LocalDateTime startTime, CountDownLatch countDownLatch) { + + ExecutorService pool = ServerCommonPool.pool; + + // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 + return CompletableFuture.runAsync( + () -> { + while (true) { + if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { + // 开始收集等待 200ms + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); + + // 获取到OM + // 判断信息是否是需要的类型 + // 根据 init_time + OctopusMessageType + AgentOperationMessage + if (!judgyIsCurrentServerReplyMessage( + message, + startTime + )) { + + // 不是当前应用需要的的OM,将信息放置与Cache队列的末尾 + try { + OCTOPUS_MESSAGE_FROM_AGENT.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // 返回,继续死循环 + continue; + } + + // 是当前需要的消息信息 + try { + ServerInfoVO serverInfoVO = objectMapper.readValue( + (String) message.getResult(), + new TypeReference() { + } + ); + + result.put( + message.getUuid(), + serverInfoVO + ); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + countDownLatch.countDown(); + } + }, + pool + ); } @Override public String shutdownAgentDanger(String agentTopicName) { + + if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) { + log.error("agentTopicName Error !"); + return "agentTopicName Error!"; + } + + LocalDateTime formatTime = TimeUtils.currentFormatTime(); + buildAndSendToAllAgent( + AgentOperationType.SHUTDOWN, + formatTime + ); + + // 是否需要检查相应的状态 + + return null; } diff --git a/server/src/main/java/io/wdd/rpc/controller/AgentController.java b/server/src/main/java/io/wdd/rpc/controller/AgentController.java index 5f7d01c..7e6ece0 100644 --- a/server/src/main/java/io/wdd/rpc/controller/AgentController.java +++ b/server/src/main/java/io/wdd/rpc/controller/AgentController.java @@ -4,6 +4,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.wdd.common.beans.response.R; import io.wdd.rpc.agent.OctopusAgentService; +import io.wdd.server.beans.vo.ServerInfoVO; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -26,4 +27,12 @@ public class AgentController { return R.ok(octopusAgentService.getAllAgentVersion()); } + @GetMapping("/coreInfo") + @ApiOperation("获取所有OctopusAgent的核心信息") + public R> getAllAgentCoreInfo(){ + + return R.ok(octopusAgentService.getAllAgentCoreInfo()); + + } + }