From 24471df4f80d711de3c26b611d9fb37ac9434b56 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Thu, 5 Jan 2023 10:57:58 +0800 Subject: [PATCH] [ agent ] [ status ] - accomplish agent status - 3 --- .../java/io/wdd/agent/AgentApplication.java | 1 + .../agent/status/AgentStatusCollector.java | 34 ++++++++----- .../io/wdd/rpc/init/AcceptAgentInitInfo.java | 51 ++++++++++++++----- .../server/coreService/CoreServerService.java | 2 + .../impl/CoreServerServiceImpl.java | 7 +++ .../server/utils/DaemonDatabaseOperator.java | 2 +- 6 files changed, 70 insertions(+), 27 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/AgentApplication.java b/agent/src/main/java/io/wdd/agent/AgentApplication.java index 1834cd2..89532d9 100644 --- a/agent/src/main/java/io/wdd/agent/AgentApplication.java +++ b/agent/src/main/java/io/wdd/agent/AgentApplication.java @@ -5,6 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication +@EnableScheduling public class AgentApplication { public static void main(String[] args) { diff --git a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java index 2cb7359..7879d42 100644 --- a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java +++ b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java @@ -7,31 +7,24 @@ import io.wdd.agent.config.utils.TimeUtils; import io.wdd.agent.status.hardware.CpuInfo; import io.wdd.agent.status.hardware.MemoryInfo; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.connection.stream.StringRecord; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import oshi.SystemInfo; import oshi.hardware.HardwareAbstractionLayer; import oshi.software.os.OperatingSystem; import javax.annotation.Resource; -import java.time.LocalDateTime; import java.util.Collections; import java.util.List; +import java.util.Map; @Service @Slf4j public class AgentStatusCollector { - @Resource - RedisTemplate redisTemplate; - - @Resource - ObjectMapper objectMapper; - - @Resource - AgentServerInfo agentServerInfo; - - private static final SystemInfo systemInfo; /** * 硬件信息 @@ -41,7 +34,6 @@ public class AgentStatusCollector { * 系统信息 */ private static final OperatingSystem os; - private static final List AgentStatusCache = Collections.singletonList(new AgentStatus()); static { @@ -50,6 +42,12 @@ public class AgentStatusCollector { os = systemInfo.getOperatingSystem(); } + @Resource + RedisTemplate redisTemplate; + @Resource + ObjectMapper objectMapper; + @Resource + AgentServerInfo agentServerInfo; public AgentStatus collect() { @@ -81,11 +79,21 @@ public class AgentStatusCollector { } + // agent boot up 60s then start to report its status + // at the fix rate of 15s + @Scheduled(initialDelay = 60000, fixedRate = 5000) public void sendAgentStatusToRedis() { try { - log.info("time is [{}] , and agent status are [{}]", LocalDateTime.now(), objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(collect())); + String statusStreamKey = agentServerInfo.getServerName() + "-status"; + + Map map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect())); + + StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey); + + log.debug("Agent Status is ==> {}",map); + redisTemplate.opsForStream().add(stringRecord); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index 0f4fb94..b492a8f 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; @@ -25,6 +26,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * The type Accept boot up info message. @@ -33,10 +35,6 @@ import java.util.concurrent.TimeUnit; @Slf4j(topic = "octopus agent init ") public class AcceptAgentInitInfo { - @Resource - InitRabbitMQConfig initRabbitMQConfig; - - public static Set ALL_SERVER_CITY_INFO = new HashSet<>( Arrays.asList( "HongKong", "Tokyo", "Seoul", "Phoenix", "London", "Shanghai", "Chengdu" @@ -47,6 +45,10 @@ public class AcceptAgentInitInfo { "amd64", "arm64", "arm32", "xia32", "miples" ) ); + @Resource + InitRabbitMQConfig initRabbitMQConfig; + @Resource + RedisTemplate redisTemplate; /** * The Database operator. */ @@ -101,17 +103,21 @@ public class AcceptAgentInitInfo { serverInfoVO.setTopicName(agentQueueTopic); // cache enabled for agent re-register - if (!checkAgentAlreadyRegister(agentQueueTopic)) { - // 3. save the agent info into database - // backend fixed thread daemon to operate the database ensuring the operation is correct ! - log.info("[AGENT INIT] - agent not exist ! start to register !"); - if (!databaseOperator.saveInitOctopusAgentInfo(serverInfoVO)) { - throw new MyRuntimeException("database save agent info error !"); - } - +// if (!checkAgentAlreadyRegister(agentQueueTopic)) { +// log.info("[AGENT INIT] - agent not exist ! start to register !"); +// } + // whether agent is registered already + // save or update the octopus agent server info + // 3. save the agent info into database + // backend fixed thread daemon to operate the database ensuring the operation is correct ! + if (!databaseOperator.saveInitOctopusAgentInfo(serverInfoVO)) { + throw new MyRuntimeException("database save agent info error !"); } - // 4. send InitMessage to agent + // 4. generate the Octopus Agent Status Redis Stream Key & Consumer-Group + generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getServerName()); + + // 5. send InitMessage to agent sendInitMessageToAgent(serverInfoVO); @@ -156,6 +162,25 @@ public class AcceptAgentInitInfo { channel.basicAck(deliveryTag, false); } + private void generateAgentStatusRedisStreamConsumerGroup(String serverName) { + + String statusStreamKey = serverName + "-status"; + + // check for octopus-server consumer group + if (redisTemplate.opsForStream().groups(statusStreamKey) + .stream() + .filter( + group -> group.groupName().startsWith("Octopus") + ).collect(Collectors.toSet()).contains(Boolean.FALSE)) { + + log.debug(" not find the group, recreate"); + // not find the group, recreate + redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer"); + } + + log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", serverName, statusStreamKey); + } + private boolean checkAgentAlreadyRegister(String agentQueueTopic) { Optional first = databaseOperator.getAllServerName().stream(). diff --git a/server/src/main/java/io/wdd/server/coreService/CoreServerService.java b/server/src/main/java/io/wdd/server/coreService/CoreServerService.java index 0696284..b38a115 100644 --- a/server/src/main/java/io/wdd/server/coreService/CoreServerService.java +++ b/server/src/main/java/io/wdd/server/coreService/CoreServerService.java @@ -19,6 +19,8 @@ public interface CoreServerService { boolean serverCreate(ServerInfoVO serverInfoVO); + boolean serverCreateOrUpdate(ServerInfoVO serverInfoVO); + boolean serverUpdate(ServerInfoPO serverInfoPO); boolean serverDelete(Long serverId, String serverName); diff --git a/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java b/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java index a9f75f0..854f0c7 100644 --- a/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java +++ b/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java @@ -82,6 +82,13 @@ public class CoreServerServiceImpl implements CoreServerService { return serverInfoService.save(serverInfoPO); } + @Override + public boolean serverCreateOrUpdate(ServerInfoVO serverInfoVO) { + + ServerInfoPO serverInfoPO = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class); + return serverInfoService.saveOrUpdate(serverInfoPO); + } + @Override public boolean serverUpdate(ServerInfoPO serverInfoPO) { diff --git a/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java b/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java index af097ff..4224ecb 100644 --- a/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java +++ b/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java @@ -42,7 +42,7 @@ public class DaemonDatabaseOperator { // log.info("simulate store the Octopus Agent Server info"); // return true; - return coreServerService.serverCreate(serverInfoVO); + return coreServerService.serverCreateOrUpdate(serverInfoVO); }