[ agent ] [ status ] - accomplish agent status - 3

This commit is contained in:
zeaslity
2023-01-05 10:57:58 +08:00
parent 3326196ebc
commit 24471df4f8
6 changed files with 70 additions and 27 deletions

View File

@@ -5,6 +5,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication @SpringBootApplication
@EnableScheduling
public class AgentApplication { public class AgentApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@@ -7,31 +7,24 @@ import io.wdd.agent.config.utils.TimeUtils;
import io.wdd.agent.status.hardware.CpuInfo; import io.wdd.agent.status.hardware.CpuInfo;
import io.wdd.agent.status.hardware.MemoryInfo; import io.wdd.agent.status.hardware.MemoryInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import oshi.SystemInfo; import oshi.SystemInfo;
import oshi.hardware.HardwareAbstractionLayer; import oshi.hardware.HardwareAbstractionLayer;
import oshi.software.os.OperatingSystem; import oshi.software.os.OperatingSystem;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
@Service @Service
@Slf4j @Slf4j
public class AgentStatusCollector { public class AgentStatusCollector {
@Resource
RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
@Resource
AgentServerInfo agentServerInfo;
private static final SystemInfo systemInfo; private static final SystemInfo systemInfo;
/** /**
* 硬件信息 * 硬件信息
@@ -41,7 +34,6 @@ public class AgentStatusCollector {
* 系统信息 * 系统信息
*/ */
private static final OperatingSystem os; private static final OperatingSystem os;
private static final List<AgentStatus> AgentStatusCache = Collections.singletonList(new AgentStatus()); private static final List<AgentStatus> AgentStatusCache = Collections.singletonList(new AgentStatus());
static { static {
@@ -50,6 +42,12 @@ public class AgentStatusCollector {
os = systemInfo.getOperatingSystem(); os = systemInfo.getOperatingSystem();
} }
@Resource
RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
@Resource
AgentServerInfo agentServerInfo;
public AgentStatus collect() { public AgentStatus collect() {
@@ -81,11 +79,21 @@ public class AgentStatusCollector {
} }
// agent boot up 60s then start to report its status
// at the fix rate of 15s
@Scheduled(initialDelay = 60000, fixedRate = 5000)
public void sendAgentStatusToRedis() { public void sendAgentStatusToRedis() {
try { try {
log.info("time is [{}] , and agent status are [{}]", LocalDateTime.now(), objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(collect())); String statusStreamKey = agentServerInfo.getServerName() + "-status";
Map<String, String> map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect()));
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey);
log.debug("Agent Status is ==> {}",map);
redisTemplate.opsForStream().add(stringRecord);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -25,6 +26,7 @@ import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
* The type Accept boot up info message. * The type Accept boot up info message.
@@ -33,10 +35,6 @@ import java.util.concurrent.TimeUnit;
@Slf4j(topic = "octopus agent init ") @Slf4j(topic = "octopus agent init ")
public class AcceptAgentInitInfo { public class AcceptAgentInitInfo {
@Resource
InitRabbitMQConfig initRabbitMQConfig;
public static Set<String> ALL_SERVER_CITY_INFO = new HashSet<>( public static Set<String> ALL_SERVER_CITY_INFO = new HashSet<>(
Arrays.asList( Arrays.asList(
"HongKong", "Tokyo", "Seoul", "Phoenix", "London", "Shanghai", "Chengdu" "HongKong", "Tokyo", "Seoul", "Phoenix", "London", "Shanghai", "Chengdu"
@@ -47,6 +45,10 @@ public class AcceptAgentInitInfo {
"amd64", "arm64", "arm32", "xia32", "miples" "amd64", "arm64", "arm32", "xia32", "miples"
) )
); );
@Resource
InitRabbitMQConfig initRabbitMQConfig;
@Resource
RedisTemplate redisTemplate;
/** /**
* The Database operator. * The Database operator.
*/ */
@@ -101,17 +103,21 @@ public class AcceptAgentInitInfo {
serverInfoVO.setTopicName(agentQueueTopic); serverInfoVO.setTopicName(agentQueueTopic);
// cache enabled for agent re-register // cache enabled for agent re-register
if (!checkAgentAlreadyRegister(agentQueueTopic)) { // if (!checkAgentAlreadyRegister(agentQueueTopic)) {
// 3. save the agent info into database // log.info("[AGENT INIT] - agent not exist ! start to register !");
// backend fixed thread daemon to operate the database ensuring the operation is correct ! // }
log.info("[AGENT INIT] - agent not exist ! start to register !"); // whether agent is registered already
if (!databaseOperator.saveInitOctopusAgentInfo(serverInfoVO)) { // save or update the octopus agent server info
throw new MyRuntimeException("database save agent info error !"); // 3. save the agent info into database
} // backend fixed thread daemon to operate the database ensuring the operation is correct !
if (!databaseOperator.saveInitOctopusAgentInfo(serverInfoVO)) {
throw new MyRuntimeException("database save agent info error !");
} }
// 4. send InitMessage to agent // 4. generate the Octopus Agent Status Redis Stream Key & Consumer-Group
generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getServerName());
// 5. send InitMessage to agent
sendInitMessageToAgent(serverInfoVO); sendInitMessageToAgent(serverInfoVO);
@@ -156,6 +162,25 @@ public class AcceptAgentInitInfo {
channel.basicAck(deliveryTag, false); channel.basicAck(deliveryTag, false);
} }
private void generateAgentStatusRedisStreamConsumerGroup(String serverName) {
String statusStreamKey = serverName + "-status";
// check for octopus-server consumer group
if (redisTemplate.opsForStream().groups(statusStreamKey)
.stream()
.filter(
group -> group.groupName().startsWith("Octopus")
).collect(Collectors.toSet()).contains(Boolean.FALSE)) {
log.debug(" not find the group, recreate");
// not find the group, recreate
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
}
log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", serverName, statusStreamKey);
}
private boolean checkAgentAlreadyRegister(String agentQueueTopic) { private boolean checkAgentAlreadyRegister(String agentQueueTopic) {
Optional<String> first = databaseOperator.getAllServerName().stream(). Optional<String> first = databaseOperator.getAllServerName().stream().

View File

@@ -19,6 +19,8 @@ public interface CoreServerService {
boolean serverCreate(ServerInfoVO serverInfoVO); boolean serverCreate(ServerInfoVO serverInfoVO);
boolean serverCreateOrUpdate(ServerInfoVO serverInfoVO);
boolean serverUpdate(ServerInfoPO serverInfoPO); boolean serverUpdate(ServerInfoPO serverInfoPO);
boolean serverDelete(Long serverId, String serverName); boolean serverDelete(Long serverId, String serverName);

View File

@@ -82,6 +82,13 @@ public class CoreServerServiceImpl implements CoreServerService {
return serverInfoService.save(serverInfoPO); return serverInfoService.save(serverInfoPO);
} }
@Override
public boolean serverCreateOrUpdate(ServerInfoVO serverInfoVO) {
ServerInfoPO serverInfoPO = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class);
return serverInfoService.saveOrUpdate(serverInfoPO);
}
@Override @Override
public boolean serverUpdate(ServerInfoPO serverInfoPO) { public boolean serverUpdate(ServerInfoPO serverInfoPO) {

View File

@@ -42,7 +42,7 @@ public class DaemonDatabaseOperator {
// log.info("simulate store the Octopus Agent Server info"); // log.info("simulate store the Octopus Agent Server info");
// return true; // return true;
return coreServerService.serverCreate(serverInfoVO); return coreServerService.serverCreateOrUpdate(serverInfoVO);
} }