[ Server ] accomplish alive status precedure
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
package io.wdd.rpc.init;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import io.wdd.common.utils.TimeUtils;
|
||||
import io.wdd.rpc.message.OctopusMessage;
|
||||
import io.wdd.rpc.message.OctopusMessageType;
|
||||
import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
||||
@@ -22,10 +24,11 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper;
|
||||
|
||||
/**
|
||||
* The type Accept boot up info message.
|
||||
*/
|
||||
@@ -108,8 +111,13 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
try {
|
||||
|
||||
serverInfoVO = objectMapper.readValue(
|
||||
OctopusMessage initOctopusMessageFromAgent = OctopusObjectMapper.readValue(
|
||||
message.getBody(),
|
||||
OctopusMessage.class
|
||||
);
|
||||
|
||||
serverInfoVO = OctopusObjectMapper.readValue(
|
||||
(String) initOctopusMessageFromAgent.getContent(),
|
||||
ServerInfoVO.class
|
||||
);
|
||||
|
||||
@@ -126,6 +134,7 @@ public class AcceptAgentInitInfo {
|
||||
// if (!checkAgentAlreadyRegister(agentQueueTopic)) {
|
||||
// log.info("[AGENT INIT] - agent not exist ! start to register !");
|
||||
// }
|
||||
|
||||
// whether agent is registered already
|
||||
// save or update the octopus agent server info
|
||||
// 3. save the agent info into database
|
||||
@@ -135,7 +144,7 @@ public class AcceptAgentInitInfo {
|
||||
}
|
||||
|
||||
// 4. generate the Octopus Agent Status Redis Stream Key & Consumer-Group
|
||||
generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getTopicName());
|
||||
//generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getTopicName());
|
||||
|
||||
// 5. send InitMessage to agent
|
||||
sendInitMessageToAgent(serverInfoVO);
|
||||
@@ -169,7 +178,7 @@ public class AcceptAgentInitInfo {
|
||||
*/
|
||||
|
||||
|
||||
throw new MyRuntimeException(" Octopus Server Initialization Error, please check !");
|
||||
throw new MyRuntimeException("Octopus Server Initialization Error, please check !");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -208,18 +217,6 @@ public class AcceptAgentInitInfo {
|
||||
);
|
||||
}
|
||||
|
||||
// check for octopus-server consumer group
|
||||
/*if (redisTemplate.opsForStream().groups(statusStreamKey)
|
||||
.stream()
|
||||
.filter(
|
||||
group -> group.groupName().startsWith("Octopus")
|
||||
).collect(Collectors.toSet()).contains(Boolean.FALSE)) {
|
||||
|
||||
|
||||
|
||||
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
|
||||
}*/
|
||||
|
||||
log.debug(
|
||||
"octopus agent [ {} ] status report stream key [ {} ] has been created !",
|
||||
agentTopicName,
|
||||
@@ -240,16 +237,24 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
private boolean sendInitMessageToAgent(ServerInfoVO serverInfoVO) {
|
||||
|
||||
OctopusMessage octopusMessage = OctopusMessage
|
||||
.builder()
|
||||
.type(OctopusMessageType.INIT)
|
||||
// should be the OctopusExchange Name
|
||||
.content(String.valueOf(initRabbitMQConfig.OCTOPUS_EXCHANGE))
|
||||
.init_time(LocalDateTime.now())
|
||||
.uuid(serverInfoVO.getTopicName())
|
||||
.build();
|
||||
try {
|
||||
String serverInfoContent = OctopusObjectMapper.writeValueAsString(serverInfoVO);
|
||||
|
||||
OctopusMessage octopusMessage = OctopusMessage
|
||||
.builder()
|
||||
.type(OctopusMessageType.INIT)
|
||||
// should be the OctopusExchange Name
|
||||
.content(serverInfoContent)
|
||||
.init_time(TimeUtils.currentFormatTime())
|
||||
.uuid(serverInfoVO.getTopicName())
|
||||
.build();
|
||||
|
||||
oMessageToAgentSender.sendINIT(octopusMessage);
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
oMessageToAgentSender.sendINIT(octopusMessage);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -44,32 +44,46 @@ public class OMessageToAgentSender {
|
||||
}
|
||||
|
||||
// send to Queue -- InitFromServer
|
||||
log.info("send INIT OrderCommand to Agent = {}", message);
|
||||
log.info(
|
||||
"send INIT OrderCommand to Agent = {}",
|
||||
message
|
||||
);
|
||||
|
||||
rabbitTemplate.convertAndSend(initRabbitMQConfig.INIT_EXCHANGE, initRabbitMQConfig.INIT_FROM_SERVER_KEY, writeData(message));
|
||||
rabbitTemplate.convertAndSend(
|
||||
initRabbitMQConfig.INIT_EXCHANGE,
|
||||
initRabbitMQConfig.INIT_FROM_SERVER_KEY,
|
||||
writeData(message)
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void send(OctopusMessage octopusMessage) {
|
||||
|
||||
log.debug("OctopusMessage {} send to agent {}", octopusMessage, octopusMessage.getUuid());
|
||||
log.debug(
|
||||
"OctopusMessage {} send to agent {}",
|
||||
octopusMessage,
|
||||
octopusMessage.getUuid()
|
||||
);
|
||||
|
||||
rabbitTemplate.convertAndSend(
|
||||
initRabbitMQConfig.OCTOPUS_EXCHANGE,
|
||||
octopusMessage.getUuid() + "*",
|
||||
writeData(octopusMessage));
|
||||
writeData(octopusMessage)
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void send(List<OctopusMessage> octopusMessageList) {
|
||||
|
||||
octopusMessageList.stream().forEach(
|
||||
octopusMessage -> {
|
||||
this.send(octopusMessage);
|
||||
}
|
||||
);
|
||||
octopusMessageList
|
||||
.stream()
|
||||
.forEach(
|
||||
octopusMessage -> {
|
||||
this.send(octopusMessage);
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ public class BuildStatusScheduleTask {
|
||||
private void buildAll() {
|
||||
|
||||
// Agent存活健康状态检查
|
||||
buildMonitorAllAgentStatusScheduleTask();
|
||||
buildMonitorAllAgentAliveStatusScheduleTask();
|
||||
|
||||
// Agent运行信息检查 Metric
|
||||
|
||||
@@ -99,12 +99,12 @@ public class BuildStatusScheduleTask {
|
||||
* 延迟触发时间 healthyCheckStartDelaySeconds
|
||||
* 定时任务间隔 healthyCronTimeExpress
|
||||
*/
|
||||
private void buildMonitorAllAgentStatusScheduleTask() {
|
||||
private void buildMonitorAllAgentAliveStatusScheduleTask() {
|
||||
|
||||
// build the Job
|
||||
octopusQuartzService.addMission(
|
||||
AgentAliveStatusMonitorJob.class,
|
||||
"monitorAllAgentStatusJob",
|
||||
"monitorAllAgentAliveStatusJob",
|
||||
JOB_GROUP_NAME,
|
||||
healthyCheckStartDelaySeconds,
|
||||
healthyCronTimeExpress,
|
||||
|
||||
@@ -49,8 +49,7 @@ public class AgentAliveStatusMonitorService {
|
||||
@Resource
|
||||
AsyncStatusService asyncStatusService;
|
||||
|
||||
|
||||
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
|
||||
private HashMap<String, Boolean> AGENT_HEALTHY_INIT_MAP;
|
||||
|
||||
public void go() {
|
||||
|
||||
@@ -77,29 +76,27 @@ public class AgentAliveStatusMonitorService {
|
||||
updateAllAgentHealthyStatus(agentAliveStatusMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化Agent存活状态的Redis缓存的信息,全部设置为False,然后等待存活状态检测
|
||||
*/
|
||||
private void checkOrCreateRedisHealthyKey() {
|
||||
|
||||
// 检查开始的时候 需要手动将所有Agent的状态置为0
|
||||
// Agent如果存活,那么就可以将其自身状态修改为1
|
||||
|
||||
// build the redis all agent healthy map struct
|
||||
HashMap<String, String> initMap = new HashMap<>(32);
|
||||
HashMap<String, Boolean> initMap = new HashMap<>(32);
|
||||
ALL_AGENT_TOPIC_NAME_LIST
|
||||
.stream()
|
||||
.forEach(
|
||||
agentTopicName -> {
|
||||
initMap.put(
|
||||
agentTopicName,
|
||||
"0"
|
||||
Boolean.FALSE
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
initMap.put(
|
||||
"updateTime",
|
||||
TimeUtils.currentTimeString()
|
||||
);
|
||||
|
||||
// cache this map struct
|
||||
AGENT_HEALTHY_INIT_MAP = initMap;
|
||||
|
||||
@@ -111,6 +108,14 @@ public class AgentAliveStatusMonitorService {
|
||||
initMap
|
||||
);
|
||||
|
||||
redisTemplate
|
||||
.opsForHash()
|
||||
.put(
|
||||
ALL_AGENT_STATUS_REDIS_KEY,
|
||||
"initTime",
|
||||
TimeUtils.currentTimeString()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
public void updateAllAgentHealthyStatus(Map<String, Boolean> agentAliveStatusMap) {
|
||||
@@ -123,6 +128,12 @@ public class AgentAliveStatusMonitorService {
|
||||
// 执行Metric上报定时任务
|
||||
// buildStatusScheduleTask.buildAgentMetricScheduleTask();
|
||||
|
||||
log.debug(
|
||||
"[存活状态] - 当前时间为 [ %s ] , 所有的Agent存活状态为=> %s",
|
||||
currentTimeString,
|
||||
agentAliveStatusMap
|
||||
);
|
||||
|
||||
// 这里仅仅是更新时间
|
||||
redisTemplate
|
||||
.opsForHash()
|
||||
|
||||
@@ -10,7 +10,7 @@ spring:
|
||||
port: 20672
|
||||
username: boge
|
||||
password: boge8tingH
|
||||
virtual-host: /
|
||||
virtual-host: /wdd
|
||||
listener:
|
||||
simple:
|
||||
retry:
|
||||
@@ -118,7 +118,7 @@ octopus:
|
||||
name: octopus-agent
|
||||
healthy:
|
||||
type: cron
|
||||
cron: 10 */1 * * * ? *
|
||||
cron: 10 * * * * ? *
|
||||
start-delay: 30
|
||||
metric:
|
||||
pinch: 20
|
||||
|
||||
Reference in New Issue
Block a user