[ server ] [ scheduler ]- script scheduler - 1

This commit is contained in:
zeaslity
2023-01-17 12:05:04 +08:00
parent 4812756408
commit 8ef3b271b1
26 changed files with 709 additions and 109 deletions

View File

@@ -1,64 +0,0 @@
package io.wdd.rpc.status;
import io.wdd.common.beans.status.OctopusStatusMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
/**
* 收集OctopusAgent的运行Metric信息
* <p>
* CPU Memory AppStatus易变信息
*/
@Service
@Slf4j
public class AgentRuntimeMetricStatus {
public static List<String> ALL_HEALTHY_AGENT_TOPIC_NAMES;
public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch";
public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount";
@Resource
CollectAgentStatus collectAgentStatus;
public void collect(int metricRepeatCount, int metricRepeatPinch) {
// 检查基础信息
if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAMES)) {
log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES");
}
// 构建 OctopusMessage
// 只发送一次消息让Agent循环定时执行任务
buildMetricStatusMessageAndSend(metricRepeatCount, metricRepeatPinch);
//
}
private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) {
List<OctopusStatusMessage> collect = ALL_HEALTHY_AGENT_TOPIC_NAMES.stream()
.map(
agentTopicName -> {
return OctopusStatusMessage.builder()
.type(METRIC_STATUS_MESSAGE_TYPE)
.metricRepeatCount(metricRepeatCount)
.metricRepeatPinch(metricRepeatPinch)
.agentTopicName(agentTopicName)
.build();
}
).collect(Collectors.toList());
// send to the next level
collectAgentStatus.statusMessageToAgent(collect);
}
}

View File

@@ -1,84 +0,0 @@
package io.wdd.rpc.status;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.status.AgentStatus;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@Getter
@Setter
@Slf4j
public class AgentStatusStreamReader implements StreamListener<String, MapRecord<String,String, String >> {
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde
//https://segmentfault.com/a/1190000040946712
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
/**
* 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
public AgentStatusStreamReader(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
String streamKey = message.getStream();
RecordId messageId = message.getId();
String key = (String) message.getValue().keySet().toArray()[0];
String value = message.getValue().get(key);
log.info("Octopus Agent [ {} ] status of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId);
// print to console
printPrettyAgentStatus(value);
}
private void printPrettyAgentStatus(String valueString){
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
try {
String tmp = objectMapper.readValue(valueString, new TypeReference<String>() {
});
AgentStatus agentStatus = objectMapper.readValue(tmp, new TypeReference<AgentStatus>() {
});
System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(agentStatus));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,72 +0,0 @@
package io.wdd.rpc.status;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 1. 定时任务
* 2. 向RabbitMQ中发送消息STATUS类型的消息
* 3. 然后开始监听相应的Result StreamKey
*/
@Service
public class CollectAgentStatus {
@Resource
ToAgentMessageSender toAgentMessageSender;
@Resource
ObjectMapper objectMapper;
public void collectAgentStatus(OctopusStatusMessage statusMessage) {
this.statusMessageToAgent(List.of(statusMessage));
}
public void statusMessageToAgent(List<OctopusStatusMessage> statusMessageList) {
// build all the OctopusMessage
List<OctopusMessage> octopusMessageList = statusMessageList.stream().map(
statusMessage -> {
OctopusMessage octopusMessage = buildOctopusMessageStatus(statusMessage);
return octopusMessage;
}
).collect(Collectors.toList());
// batch send all messages to RabbitMQ
toAgentMessageSender.send(octopusMessageList);
// todo how to get result ?
}
private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) {
// must be like this or it will be deserialized as LinkedHashMap
String s;
try {
s = objectMapper.writeValueAsString(octopusStatusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage.builder()
.uuid(octopusStatusMessage.getAgentTopicName())
.type(OctopusMessageType.STATUS)
.init_time(TimeUtils.currentTime())
.content(s)
.build();
}
}

View File

@@ -1,261 +0,0 @@
package io.wdd.rpc.status;
import io.wdd.common.beans.status.AgentHealthyStatusEnum;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES;
/**
* 更新频率被类 BuildStatusScheduleTask.class控制
* <p>
* <p>
* 获取所有注册的Agent
* <p>
* 发送状态检查信息, agent需要update相应的HashMap的值
* redis --> all-agent-health-map agent-topic-name : 1
* todo 分布式问题弱网环境多线程操作同一个hashMap会不会出现冲突
* <p>
* 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报
* <p>
* 检查相应的 状态HashMap然后全部置为零
*/
@Service
@Slf4j
public class MonitorAllAgentStatus {
/**
* 存储 状态对应Agent列表的Map
* Agent的状态描述为 AgentHealthyStatusEnum
* HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
* FAILED -> ["agentTopicName-1" "agentTopicName-2"]
*/
public static final Map<String, List<String>> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>();
/**
* 存储所有Agent状态的Map
* <p>
* 内容为 agentTopicName-健康状态
*/
public static final Map<String, String> ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>();
/**
* 存储所有的AgentTopicName的缓存
*/
public static final Set<String> ALL_AGENT_TOPIC_NAME_SET = new HashSet<>();
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource
RedisTemplate redisTemplate;
@Resource
CollectAgentStatus collectAgentStatus;
@Resource
CoreServerService coreServerService;
@Resource
BuildStatusScheduleTask buildStatusScheduleTask;
private List<String> ALL_AGENT_TOPIC_NAME_LIST;
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
public void go() {
try {
// 1. 获取所有注册的Agent
// todo need to cache this
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
Assert.notEmpty(
allAgentInfo,
"not agent registered ! skip the agent healthy status check !"
);
ALL_AGENT_TOPIC_NAME_LIST = allAgentInfo
.stream()
.map(ServerInfoVO::getTopicName)
.collect(Collectors.toList());
// 2023-01-16
ALL_AGENT_TOPIC_NAME_SET.clear();
ALL_AGENT_TOPIC_NAME_SET.addAll(ALL_AGENT_TOPIC_NAME_LIST);
// 1.1 检查 Agent状态保存数据结构是否正常
checkOrCreateRedisHealthyKey();
// 2.发送状态检查信息, agent需要update相应的HashMap的值
buildAndSendAgentHealthMessage();
// 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报
TimeUnit.SECONDS.sleep(MAX_WAIT_AGENT_REPORT_STATUS_TIME);
// 4.检查相应的 状态HashMap然后全部置为零
// todo 存储到某个地方,目前只是打印日志
updateAllAgentHealthyStatus();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void checkOrCreateRedisHealthyKey() {
// must init the cached map && make sure the redis key existed!
if (null == AGENT_HEALTHY_INIT_MAP || !redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) {
log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create");
// build the redis all agent healthy map struct
HashMap<String, String> initMap = new HashMap<>(32);
ALL_AGENT_TOPIC_NAME_LIST
.stream()
.forEach(
agentTopicName -> {
initMap.put(
agentTopicName,
"0"
);
}
);
initMap.put(
"updateTime",
TimeUtils.currentTimeString()
);
// cache this map struct
AGENT_HEALTHY_INIT_MAP = initMap;
// create the healthy redis structure
redisTemplate
.opsForHash()
.putAll(
ALL_AGENT_STATUS_REDIS_KEY,
initMap
);
}
}
private void buildAndSendAgentHealthMessage() {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> OctopusStatusMessage
.builder()
.agentTopicName(agentTopicName)
.type(HEALTHY_STATUS_MESSAGE_TYPE)
.build()
)
.collect(Collectors.toList());
collectAgentStatus.statusMessageToAgent(collect);
}
private void updateAllAgentHealthyStatus() {
List statusList = redisTemplate
.opsForHash()
.multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPIC_NAME_LIST
);
// current log to console is ok
// agent-topic-name : STATUS(healthy, failed, unknown)
HashMap<String, String> agentStatusMap = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) {
agentStatusMap.put(
ALL_AGENT_TOPIC_NAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i)))
);
}
String currentTimeString = TimeUtils.currentTimeString();
log.info(
"[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
currentTimeString,
agentStatusMap
);
// 2023-01-16
ALL_AGENT_HEALTHY_STATUS_MAP.clear();
ALL_AGENT_HEALTHY_STATUS_MAP.putAll(agentStatusMap);
// 2023-01-16
Map<String, List<String>> statusAgentListMap = agentStatusMap
.entrySet()
.stream()
.collect(
Collectors.groupingBy(
Map.Entry::getValue
)
)
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry
.getValue()
.stream()
.map(
Map.Entry::getKey
)
.collect(Collectors.toList())
)
);
HEALTHY_STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap);
log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了");
// help gc
agentStatusMap = null;
// Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) {
if (statusList
.get(i)
.equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i));
}
}
ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames;
// 执行Metric上报任务
buildStatusScheduleTask.buildAgentMetricScheduleTask();
// update time
AGENT_HEALTHY_INIT_MAP.put(
"updateTime",
currentTimeString
);
// init the healthy map
redisTemplate
.opsForHash()
.putAll(
ALL_AGENT_STATUS_REDIS_KEY,
AGENT_HEALTHY_INIT_MAP
);
}
private String uniformHealthyStatus(String agentStatus) {
switch (agentStatus) {
case "0":
return AgentHealthyStatusEnum.FAILED.getStatus();
case "1":
return AgentHealthyStatusEnum.HEALTHY.getStatus();
default:
return AgentHealthyStatusEnum.UNKNOWN.getStatus();
}
}
}