diff --git a/.github/workflows/build-push-docker.yml b/.github/workflows/build-push-docker.yml index 4f9fdc9..d990417 100644 --- a/.github/workflows/build-push-docker.yml +++ b/.github/workflows/build-push-docker.yml @@ -173,7 +173,7 @@ jobs: # Name for the queue RABBIT_QUEUE_NAME: "OctopusToServer" # Message to be sent - MESSAGE: "{\"uuid\":\"Octopus-Server\",\"init_time\": ${{ env.AGENT_VERSION }},\"type\":\"AGENT\",\"content\":${{ env.AGENT_VERSION }},\"result\": ${{ env.AGENT_VERSION }},\"ac_time\": ${{ env.AGENT_VERSION }}}" + MESSAGE: "{\"uuid\":\"Octopus-Server\",\"init_time\": null,\"type\":\"AGENT\",\"content\": \"${{ env.AGENT_VERSION }}\",\"result\": \"${{ env.AGENT_VERSION }}\",\"ac_time\": null}" # Durability for the queue DURABLE: true diff --git a/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java b/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java index 619a09f..e5d65e0 100644 --- a/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java +++ b/agent/src/main/java/io/wdd/agent/agent/AgentOperationInfoService.java @@ -19,7 +19,7 @@ public class AgentOperationInfoService { @Resource OMessageToServerSender oMessageToServerSender; - public void AgentOpInfo(OctopusMessage order){ + public void AgentOpInfo(OctopusMessage octopusMessage){ } diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java index 7000904..49e9acc 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerAgent.java @@ -57,11 +57,15 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler { // update agentRebootUpdateService.exAgentUpdate(operationMessage); } else if (opType.equals(AgentOperationType.VERSION)) { + // 收集Agent的版本信息 agentOperationInfoService.AgentOpVersion(octopusMessage); + } else if (opType.equals(AgentOperationType.INFO)) { + // 收集Agent的核心Info信息 agentOperationInfoService.AgentOpInfo(octopusMessage); + } else { // operation unknown log.error("Command Agent Operation Unknown! "); diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index b6def43..f9e6815 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -11,6 +11,7 @@ import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; @Service @@ -37,6 +40,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { @Resource ObjectMapper objectMapper; + @Resource + RedisTemplate redisTemplate; + @Override public Map getAllAgentVersion() { HashMap result = new HashMap<>(); @@ -44,7 +50,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 查询获取到最新的Agent版本信息 result.put( "LATEST_VERSION", - "2023-02-06-09-23-00" + getRealAgentLatestVersion() ); // 获取Agent的版本信息 -- 超时时间 @@ -63,14 +69,27 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); try { - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } + System.out.println("result = " + result); + return result; } + private String getRealAgentLatestVersion() { + + String latestVersion = (String) redisTemplate + .opsForValue() + .get( + AGENT_LATEST_VERSION + ); + + return latestVersion; + } + @Override public Map getAllAgentCoreInfo() { return null; @@ -87,7 +106,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { CompletableFuture getAllAgentVersionInfo = new CompletableFuture<>(); try { ExecutorService pool = ServerCommonPool.pool; - CountDownLatch countDownLatch = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size()); + CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size()); // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 getAllAgentVersionInfo = CompletableFuture.runAsync( @@ -163,7 +182,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) { - List octopusMessageList = ALL_AGENT_TOPIC_NAME_LIST + List octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST .stream() .map( agentTopicName -> diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java index 0398cc3..cc5fb68 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandler.java @@ -9,6 +9,7 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; import java.io.IOException; @@ -18,33 +19,68 @@ import java.util.concurrent.ArrayBlockingQueue; @Slf4j(topic = "Octopus Message Handler") public class OctopusMessageHandler { - + /** + * Redis Key 用于保存Agent的最新版本 + * 由 GitHubAction发送至 RabbitMQ中,然后此处获取处理,发送至Redis中 + */ + public static final String AGENT_LATEST_VERSION = "AGENT_LATEST_VERSION"; /** * 存储所有的从 Agent过来的 OctopusMessage - * 各个业务模块需要自己手动去获取自己需要的内容 + * 各个业务模块需要自己手动去获取自己需要的内容 + * TODO 数据一致性问题,当AgentShutDown可能有一些信息会消失 */ - public static ArrayBlockingQueue OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>(128, true); - + public static ArrayBlockingQueue OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>( + 128, + true + ); + @Resource + RedisTemplate redisTemplate; @Resource ObjectMapper objectMapper; @RabbitHandler @RabbitListener(queues = "${octopus.message.octopus_to_server}" ) - public void HandleOctopusMessageFromAgent(Message message){ + public void HandleOctopusMessageFromAgent(Message message) { OctopusMessage octopusMessage; try { - octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); + octopusMessage = objectMapper.readValue( + message.getBody(), + OctopusMessage.class + ); } catch (IOException e) { throw new MyRuntimeException("Octopus Message Wrong !"); } // Octopus Message Handler - log.info("received from agent : {} ", octopusMessage); + log.info( + "received from agent : {} ", + octopusMessage + ); + // 获取Agent的版本信息 + if (octopusMessage + .getUuid() + .equals("Octopus-Server")) { + // 更新缓存Agent的最新版本信息 + String latestVersion = (String) octopusMessage.getResult(); + + log.info( + "开始向Redis中缓存Agent的最新版本 => {}", + latestVersion + ); + redisTemplate + .opsForValue() + .set( + AGENT_LATEST_VERSION, + latestVersion + ); + + } + // todo what to do after received the result log.debug("cache the octopus message to inner cache list !"); OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage);