[ server ] - monitor all agent status

This commit is contained in:
zeaslity
2023-01-10 17:28:26 +08:00
parent 0c4531dccf
commit b7958f5c78
16 changed files with 702 additions and 64 deletions

View File

@@ -11,7 +11,6 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
public class ExecutorFunctionMessage {
String functionName;
String functionContent;

View File

@@ -0,0 +1,25 @@
package io.wdd.common.beans.status;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
public class OctopusStatusMessage {
/**
* which kind of status should be return
* short => short time message
* all => all agent status message
* healthy => check for healthy
* */
String type;
String agentTopicName;
}

View File

@@ -12,35 +12,12 @@ import java.util.concurrent.TimeUnit;
public class TimeUtils {
public static ByteBuffer currentTimeByteBuffer() {
byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8);
return ByteBuffer.wrap(timeBytes);
}
/**
* @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String
*/
public static String currentTimeString() {
return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
public static String localDateTimeString(LocalDateTime time) {
return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
/**
* https://memorynotfound.com/calculate-relative-time-time-ago-java/
*
* calculate relative time from now on
* like 5 days, 3 hours, 16 minutes level 3
*
* */
* <p>
* calculate relative time from now on
* like 5 days, 3 hours, 16 minutes level 3
*/
private static final Map<String, Long> times = new LinkedHashMap<>();
@@ -54,12 +31,36 @@ public class TimeUtils {
times.put("second", TimeUnit.SECONDS.toMillis(1));
}
public static ByteBuffer currentTimeByteBuffer() {
byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8);
return ByteBuffer.wrap(timeBytes);
}
public static LocalDateTime currentTime() {
return LocalDateTime.now(ZoneId.of("UTC+8"));
}
/**
* @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String
*/
public static String currentTimeString() {
return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
public static String localDateTimeString(LocalDateTime time) {
return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
public static String toRelative(long duration, int maxLevel) {
StringBuilder res = new StringBuilder();
int level = 0;
for (Map.Entry<String, Long> time : times.entrySet()){
for (Map.Entry<String, Long> time : times.entrySet()) {
long timeDelta = duration / time.getValue();
if (timeDelta > 0){
if (timeDelta > 0) {
res.append(timeDelta)
.append(" ")
.append(time.getKey())
@@ -68,7 +69,7 @@ public class TimeUtils {
duration -= time.getValue() * timeDelta;
level++;
}
if (level == maxLevel){
if (level == maxLevel) {
break;
}
}
@@ -85,12 +86,12 @@ public class TimeUtils {
return toRelative(duration, times.size());
}
public static String toRelative(Date start, Date end){
public static String toRelative(Date start, Date end) {
assert start.after(end);
return toRelative(end.getTime() - start.getTime());
}
public static String toRelative(Date start, Date end, int level){
public static String toRelative(Date start, Date end, int level) {
assert start.after(end);
return toRelative(end.getTime() - start.getTime(), level);
}

View File

@@ -1,5 +1,6 @@
package io.wdd.rpc.execute.result;
import io.wdd.server.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
@@ -15,11 +16,7 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_R
@Component
@Slf4j
public class CreateStreamReader implements ApplicationContextAware {
private ApplicationContext applicationContext;
private AutowireCapableBeanFactory beanFactory;
public class CreateStreamReader {
private RedisStreamReaderConfig redisStreamReaderConfig;
@@ -27,11 +24,6 @@ public class CreateStreamReader implements ApplicationContextAware {
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
// prepare the environment
@@ -56,27 +48,22 @@ public class CreateStreamReader implements ApplicationContextAware {
private void prepareEnv() {
getBeanFactory();
getRedisStreamConfig();
}
private void getRedisStreamConfig() {
this.redisStreamReaderConfig = applicationContext.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class);
this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class);
}
private void getBeanFactory() {
this.beanFactory = applicationContext.getAutowireCapableBeanFactory();
}
private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
log.debug("start to create the redis stream listener container");
// create the lazy bean
StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class);
StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class);
REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer);
@@ -112,7 +99,7 @@ public class CreateStreamReader implements ApplicationContextAware {
// double destroy
beanFactory.destroyBean(streamMessageListenerContainer);
SpringUtils.destroyBean(streamMessageListenerContainer);
streamMessageListenerContainer.stop();
// help gc
streamMessageListenerContainer = null;

View File

@@ -12,10 +12,11 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* adaptor
* provide override method to convert Object and send to rabbitmq
* provide override method to convert Object and send to rabbitmq
*/
@Component
@Slf4j(topic = "Send Message To Octopus Agent ")
@@ -31,12 +32,11 @@ public class ToAgentMessageSender {
ObjectMapper objectMapper;
/**
*
* send to Queue -- InitFromServer
* send to Queue -- InitFromServer
*
* @param message octopus message
*/
public void sendINIT(OctopusMessage message){
public void sendINIT(OctopusMessage message) {
// only accept INIT type message
if (!OctopusMessageType.INIT.equals(message.getType())) {
@@ -55,16 +55,26 @@ public class ToAgentMessageSender {
log.info("OctopusMessage {} send to agent {}", octopusMessage, octopusMessage.getUuid());
rabbitTemplate.convertAndSend(
initRabbitMQConfig.OCTOPUS_EXCHANGE,
octopusMessage.getUuid()+"*",
octopusMessage.getUuid() + "*",
writeData(octopusMessage));
}
public void send(List<OctopusMessage> octopusMessageList) {
octopusMessageList.stream().forEach(
octopusMessage -> {
this.send(octopusMessage);
}
);
}
@SneakyThrows
private byte[] writeData(Object data){
private byte[] writeData(Object data) {
return objectMapper.writeValueAsBytes(data);
}

View File

@@ -0,0 +1,43 @@
package io.wdd.rpc.scheduler.beans;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
@Data
public class OctopusQuartzJob implements Serializable {
public static final String JOB_KEY = "JOB_KEY";
@ApiModelProperty(value = "ID")
private Long id;
@ApiModelProperty(value = "用于子任务唯一标识", hidden = true)
private String uuid;
@ApiModelProperty(value = "任务名称")
private String jobName;
@ApiModelProperty(value = "Bean名称")
private String beanName;
@ApiModelProperty(value = "方法名称")
private String methodName;
@ApiModelProperty(value = "参数")
private String params;
@ApiModelProperty(value = "cron表达式")
private String cronExpression;
@ApiModelProperty(value = "状态,暂时或启动")
private Boolean isPause = false;
@ApiModelProperty(value = "子任务")
private String subTask;
@ApiModelProperty(value = "失败后暂停")
private Boolean pauseAfterFailure;
}

View File

@@ -0,0 +1,47 @@
package io.wdd.rpc.scheduler.beans;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
public class OctopusQuartzLog implements Serializable {
@ApiModelProperty(value = "ID", hidden = true)
@TableId(type = IdType.ASSIGN_ID)
private Long id;
@ApiModelProperty(value = "任务名称", hidden = true)
private String jobName;
@ApiModelProperty(value = "bean名称", hidden = true)
private String beanName;
@ApiModelProperty(value = "方法名称", hidden = true)
private String methodName;
@ApiModelProperty(value = "参数", hidden = true)
private String params;
@ApiModelProperty(value = "cron表达式", hidden = true)
private String cronExpression;
@ApiModelProperty(value = "状态", hidden = true)
private Boolean isSuccess;
@ApiModelProperty(value = "异常详情", hidden = true)
private String exceptionDetail;
@ApiModelProperty(value = "执行耗时", hidden = true)
private Long time;
@ApiModelProperty(value = "创建时间", hidden = true)
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,18 @@
package io.wdd.rpc.scheduler.call;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.rpc.status.CollectAgentStatus;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class CallAgentQuartzService {
@Resource
CollectAgentStatus collectAgentStatus;
}

View File

@@ -0,0 +1,61 @@
package io.wdd.rpc.scheduler.config;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
import io.wdd.server.utils.SpringUtils;
import org.apache.commons.lang3.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
@Async
public class ExecutionJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
//通过JobExecutionContext对象得到OctopusQuartzJob实例。
OctopusQuartzJob octopusQuartzJob = (OctopusQuartzJob)
context.getMergedJobDataMap().get(
io.wdd.rpc.scheduler.beans.OctopusQuartzJob.JOB_KEY);
//反射获取到方法,并执行。
runMethod(octopusQuartzJob.getBeanName(), octopusQuartzJob.getMethodName(), octopusQuartzJob.getParams());
}
/***
* description:反射执行方法
*
* @author: zeaslity
*/
public void runMethod(String beanName, String methodName, String params) {
Object target = SpringUtils.getBean(beanName);
Method method = null;
try {
//执行的方法只能有两种有String参数或者无参数毕竟前端只能传字符串参数给后端。
if (StringUtils.isNotBlank(params)) {
//反射获取到方法 两个参数 分别是方法名和参数类型
method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
method = target.getClass().getDeclaredMethod(methodName);
}
//反射执行方法
ReflectionUtils.makeAccessible(method);
if (StringUtils.isNotBlank(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
} catch (Exception e) {
throw new MyRuntimeException("定时任务执行失败");
}
}
}

View File

@@ -0,0 +1,49 @@
package io.wdd.rpc.scheduler.config;
import io.wdd.server.utils.SpringUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
public class QuartzRunnable implements Callable<Object> {
private final Object target;
private final Method method;
private final String params;
QuartzRunnable(String beanName, String methodName, String params) throws NoSuchMethodException, ClassNotFoundException {
//获取到bean对象
this.target = SpringUtils.getBean(beanName);
//获取到参数
this.params = params;
//如果参数不为空
if (StringUtils.isNotBlank(params)) {
//反射获取到方法 两个参数 分别是方法名和参数类型
this.method = target.getClass().getDeclaredMethod(methodName, String.class);
} else {
this.method = target.getClass().getDeclaredMethod(methodName);
}
}
/***
* description: 线程回调函数 反射执行方法
*
* @author: lixiangxiang
*/
@Override
public Object call() throws Exception {
ReflectionUtils.makeAccessible(method);
if (StringUtils.isNotBlank(params)) {
method.invoke(target, params);
} else {
method.invoke(target);
}
return null;
}
}

View File

@@ -1,5 +1,6 @@
package io.wdd.rpc.scheduler.service;
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.List;
import java.util.Map;
@@ -7,6 +8,8 @@ import java.util.Map;
public interface OctopusQuartzService {
boolean addJob(OctopusQuartzJob quartzJob);
/**
* 增加一个任务job
* @param jobClass 任务job实现类
@@ -16,8 +19,7 @@ public interface OctopusQuartzService {
* @param jobTimes 任务运行次数(若<0则不限次数
* @param jobData 任务参数
*/
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
int jobTimes, Map jobData);
void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData);
/**
* 增加一个任务job
@@ -37,6 +39,8 @@ public interface OctopusQuartzService {
*/
void updateJob(String jobName, String jobGroupName, String jobTime);
/**
* 删除一个任务job
* @param jobName

View File

@@ -1,10 +1,12 @@
package io.wdd.rpc.scheduler.service;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.scheduler.beans.OctopusQuartzJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;
@@ -12,6 +14,8 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* @author Andya
* @date 2021/4/01
@@ -32,6 +36,39 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
}
}
@Override
public boolean addJob(OctopusQuartzJob quartzJob) {
try {
// 构建jobDetail,并与PrintHelloJob类绑定(Job执行内容)
JobDetail jobDetail = JobBuilder
.newJob(PrintHelloJob.class)
.withIdentity(quartzJob.getUuid())
.build();
//通过触发器名和cron表达式创建Trigger
Trigger cronTrigger = newTrigger()
.withIdentity(quartzJob.getUuid())
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression()))
.build();
//把job信息放入jobDataMap中 job_key为标识
cronTrigger.getJobDataMap().put(OctopusQuartzJob.JOB_KEY, quartzJob);
//重置启动时间
((CronTriggerImpl)cronTrigger).setStartTime(new Date());
//执行定时任务
scheduler.scheduleJob(jobDetail, cronTrigger);
} catch (Exception e) {
log.error("【创建定时任务失败】 定时任务id{}", quartzJob.getId());
return false;
}
return true;
}
/**
* 增加一个job
*
@@ -62,12 +99,11 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
// 使用simpleTrigger规则
Trigger trigger = null;
if (jobTimes < 0) {
trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
trigger = newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
.startNow().build();
} else {
trigger = TriggerBuilder
.newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
.repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
.startNow().build();
}
@@ -108,7 +144,7 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService {
// 定义调度触发规则
// 使用cornTrigger规则
// 触发器key
Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName)
.startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
// 把作业和触发器注册到任务调度中

View File

@@ -0,0 +1,16 @@
package io.wdd.rpc.scheduler.service;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class PrintHelloJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println();
System.out.println("PrintHelloJob被执行了");
System.out.println("context = " + context);
System.out.println();
}
}

View File

@@ -0,0 +1,59 @@
package io.wdd.rpc.status;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 1. 定时任务
* 2. 向RabbitMQ中发送消息STATUS类型的消息
* 3. 然后开始监听相应的Result StreamKey
*/
@Service
public class CollectAgentStatus {
@Resource
ToAgentMessageSender toAgentMessageSender;
public void collectAgentStatus(OctopusStatusMessage statusMessage) {
this.collectAgentStatusList(List.of(statusMessage));
}
public void collectAgentStatusList(List<OctopusStatusMessage> statusMessageList) {
// build all the OctopusMessage
List<OctopusMessage> octopusMessageList = statusMessageList.stream().map(
statusMessage -> {
OctopusMessage octopusMessage = buildOctopusMessageStatus(statusMessage);
return octopusMessage;
}
).collect(Collectors.toList());
// batch send all messages to RabbitMQ
toAgentMessageSender.send(octopusMessageList);
// todo how to get result ?
}
private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) {
return OctopusMessage.builder()
.uuid(octopusStatusMessage.getAgentTopicName())
.type(OctopusMessageType.STATUS)
.init_time(TimeUtils.currentTime())
.content(octopusStatusMessage)
.build();
}
}

View File

@@ -0,0 +1,140 @@
package io.wdd.rpc.status;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 获取所有注册的Agent
* <p>
* 发送状态检查信息, agent需要update相应的HashMap的值
* redis --> all-agent-health-map agent-topic-name : 1
* todo 分布式问题弱网环境多线程操作同一个hashMap会不会出现冲突
* <p>
* 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报
* <p>
* 检查相应的 状态HashMap然后全部置为零
*/
@Service
@Slf4j
public class MonitorAllAgentStatus {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
private static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS";
private static final String STATUS_MESSAGE_TYPE = "ping";
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
private List<String> ALL_AGENT_TOPICNAME_LIST;
@Resource
RedisTemplate redisTemplate;
@Resource
CollectAgentStatus collectAgentStatus;
@Resource
CoreServerService coreServerService;
public void go() {
try {
// 1. 获取所有注册的Agent
// todo need to cache this
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
Assert.notEmpty(allAgentInfo,"not agent registered ! skip the agent healthy status check !");
ALL_AGENT_TOPICNAME_LIST = allAgentInfo.stream().map(ServerInfoVO::getTopicName).collect(Collectors.toList());
// 1.1 检查 Agent状态保存数据结构是否正常
checkOrCreateRedisHealthyKey();
// 2.发送状态检查信息, agent需要update相应的HashMap的值
buildAndSendAgentHealthMessage();
// 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报
TimeUnit.SECONDS.sleep(MAX_WAIT_AGENT_REPORT_STATUS_TIME);
// 4.检查相应的 状态HashMap然后全部置为零
// todo 存储到某个地方,目前只是打印日志
updateAllAgentHealthyStatus();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void checkOrCreateRedisHealthyKey() {
if (!redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) {
log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create");
// build the redis all agent healthy map struct
HashMap<String, String> initMap = new HashMap<>(32);
ALL_AGENT_TOPICNAME_LIST.stream().forEach(
agentTopicName -> {
initMap.put(agentTopicName, "0");
}
);
initMap.put("updateTime", TimeUtils.currentTimeString());
// cache this map struct
AGENT_HEALTHY_INIT_MAP = initMap;
redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, initMap);
}
}
private void buildAndSendAgentHealthMessage() {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST.stream().map(
agentTopicName -> OctopusStatusMessage.builder()
.agentTopicName(agentTopicName)
.type(STATUS_MESSAGE_TYPE)
.build()
).collect(Collectors.toList());
collectAgentStatus.collectAgentStatusList(collect);
}
private void updateAllAgentHealthyStatus() {
List statusList = redisTemplate.opsForHash().multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPICNAME_LIST);
HashMap<String, String> tmp = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) {
tmp.put(ALL_AGENT_TOPICNAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i)))
);
}
// current log to console is ok
String currentTimeString = TimeUtils.currentTimeString();
log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp);
// update time
AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString);
// init the healthy map
redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, AGENT_HEALTHY_INIT_MAP);
}
private String uniformHealthyStatus(String agentStatus) {
switch (agentStatus) {
case "0":
return "FAILED";
case "1":
return "HEALTHY";
default:
return "UNKNOWN";
}
}
}

View File

@@ -0,0 +1,143 @@
package io.wdd.server.utils;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class SpringUtils implements ApplicationContextAware, BeanFactoryPostProcessor {
private static ApplicationContext applicationContext;
private static ConfigurableListableBeanFactory beanFactory;
/**
* 获取对象
*
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws org.springframework.beans.BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) beanFactory.getBean(name);
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = beanFactory.getBean(clz);
return result;
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws org.springframework.beans.BeansException
*/
public static <T> T getBean(String beanName, Class<T> clz) throws BeansException {
T result = beanFactory.getBean(beanName, clz);
return result;
}
/**
* 如果BeanFactory包含一个与所给名称匹配的bean定义则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
return beanFactory.containsBean(name);
}
/**
* 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到将会抛出一个异常NoSuchBeanDefinitionException
*
* @param name
* @return boolean
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注册对象的类型
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getType(name);
}
/**
* 如果给定的bean名字在bean定义中有别名则返回这些别名
*
* @param name
* @return
* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
return beanFactory.getAliases(name);
}
/**
* 获取aop代理对象
*
* @param invoker
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T getAopProxy(T invoker) {
return (T) AopContext.currentProxy();
}
/**
* 获取当前的环境配置无配置返回null
*
* @return 当前的环境配置
*/
public static String[] getActiveProfiles() {
return applicationContext.getEnvironment().getActiveProfiles();
}
public static void destroyBean(Object bean) {
beanFactory.destroyBean(bean);
}
// /**
// * 获取当前的环境配置,当有多个环境配置时,只获取第一个
// *
// * @return 当前的环境配置
// */
// public static String getActiveProfile()
// {
// final String[] activeProfiles = getActiveProfiles();
// return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
// }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.applicationContext = applicationContext;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringUtils.beanFactory = beanFactory;
}
}