diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java index cc124e8..75b7b27 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutorFunctionMessage.java @@ -11,7 +11,6 @@ import lombok.experimental.SuperBuilder; @SuperBuilder(toBuilder = true) public class ExecutorFunctionMessage { - String functionName; String functionContent; diff --git a/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java new file mode 100644 index 0000000..cbf730c --- /dev/null +++ b/common/src/main/java/io/wdd/common/beans/status/OctopusStatusMessage.java @@ -0,0 +1,25 @@ +package io.wdd.common.beans.status; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder(toBuilder = true) +public class OctopusStatusMessage { + + /** + * which kind of status should be return + * short => short time message + * all => all agent status message + * healthy => check for healthy + * */ + String type; + + String agentTopicName; + + +} diff --git a/common/src/main/java/io/wdd/common/utils/TimeUtils.java b/common/src/main/java/io/wdd/common/utils/TimeUtils.java index 1715d20..478a28e 100644 --- a/common/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/common/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -12,35 +12,12 @@ import java.util.concurrent.TimeUnit; public class TimeUtils { - public static ByteBuffer currentTimeByteBuffer() { - - byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); - - return ByteBuffer.wrap(timeBytes); - } - - - /** - * @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String - */ - public static String currentTimeString() { - - return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - } - - - public static String localDateTimeString(LocalDateTime time) { - return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - } - - /** * https://memorynotfound.com/calculate-relative-time-time-ago-java/ - * - * calculate relative time from now on - * like 5 days, 3 hours, 16 minutes level 3 - * - * */ + *

+ * calculate relative time from now on + * like 5 days, 3 hours, 16 minutes level 3 + */ private static final Map times = new LinkedHashMap<>(); @@ -54,12 +31,36 @@ public class TimeUtils { times.put("second", TimeUnit.SECONDS.toMillis(1)); } + public static ByteBuffer currentTimeByteBuffer() { + + byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); + + return ByteBuffer.wrap(timeBytes); + } + + public static LocalDateTime currentTime() { + + return LocalDateTime.now(ZoneId.of("UTC+8")); + } + + /** + * @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String + */ + public static String currentTimeString() { + + return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + + public static String localDateTimeString(LocalDateTime time) { + return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + public static String toRelative(long duration, int maxLevel) { StringBuilder res = new StringBuilder(); int level = 0; - for (Map.Entry time : times.entrySet()){ + for (Map.Entry time : times.entrySet()) { long timeDelta = duration / time.getValue(); - if (timeDelta > 0){ + if (timeDelta > 0) { res.append(timeDelta) .append(" ") .append(time.getKey()) @@ -68,7 +69,7 @@ public class TimeUtils { duration -= time.getValue() * timeDelta; level++; } - if (level == maxLevel){ + if (level == maxLevel) { break; } } @@ -85,12 +86,12 @@ public class TimeUtils { return toRelative(duration, times.size()); } - public static String toRelative(Date start, Date end){ + public static String toRelative(Date start, Date end) { assert start.after(end); return toRelative(end.getTime() - start.getTime()); } - public static String toRelative(Date start, Date end, int level){ + public static String toRelative(Date start, Date end, int level) { assert start.after(end); return toRelative(end.getTime() - start.getTime(), level); } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java index 273c337..1e5e870 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java @@ -1,5 +1,6 @@ package io.wdd.rpc.execute.result; +import io.wdd.server.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; @@ -15,11 +16,7 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_R @Component @Slf4j -public class CreateStreamReader implements ApplicationContextAware { - - private ApplicationContext applicationContext; - - private AutowireCapableBeanFactory beanFactory; +public class CreateStreamReader { private RedisStreamReaderConfig redisStreamReaderConfig; @@ -27,11 +24,6 @@ public class CreateStreamReader implements ApplicationContextAware { private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { // prepare the environment @@ -56,27 +48,22 @@ public class CreateStreamReader implements ApplicationContextAware { private void prepareEnv() { - getBeanFactory(); - getRedisStreamConfig(); } private void getRedisStreamConfig() { - this.redisStreamReaderConfig = applicationContext.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class); + this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class); } - private void getBeanFactory() { - this.beanFactory = applicationContext.getAutowireCapableBeanFactory(); - } private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { log.debug("start to create the redis stream listener container"); // create the lazy bean - StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class); + StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class); REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer); @@ -112,7 +99,7 @@ public class CreateStreamReader implements ApplicationContextAware { // double destroy - beanFactory.destroyBean(streamMessageListenerContainer); + SpringUtils.destroyBean(streamMessageListenerContainer); streamMessageListenerContainer.stop(); // help gc streamMessageListenerContainer = null; diff --git a/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java index 794b0d7..563d5e5 100644 --- a/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java +++ b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentMessageSender.java @@ -12,10 +12,11 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.List; /** * adaptor - * provide override method to convert Object and send to rabbitmq + * provide override method to convert Object and send to rabbitmq */ @Component @Slf4j(topic = "Send Message To Octopus Agent ") @@ -31,12 +32,11 @@ public class ToAgentMessageSender { ObjectMapper objectMapper; /** - * - * send to Queue -- InitFromServer + * send to Queue -- InitFromServer * * @param message octopus message */ - public void sendINIT(OctopusMessage message){ + public void sendINIT(OctopusMessage message) { // only accept INIT type message if (!OctopusMessageType.INIT.equals(message.getType())) { @@ -55,16 +55,26 @@ public class ToAgentMessageSender { log.info("OctopusMessage {} send to agent {}", octopusMessage, octopusMessage.getUuid()); - rabbitTemplate.convertAndSend( initRabbitMQConfig.OCTOPUS_EXCHANGE, - octopusMessage.getUuid()+"*", + octopusMessage.getUuid() + "*", writeData(octopusMessage)); } + + public void send(List octopusMessageList) { + + octopusMessageList.stream().forEach( + octopusMessage -> { + this.send(octopusMessage); + } + ); + + } + @SneakyThrows - private byte[] writeData(Object data){ + private byte[] writeData(Object data) { return objectMapper.writeValueAsBytes(data); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java new file mode 100644 index 0000000..ee06b54 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzJob.java @@ -0,0 +1,43 @@ +package io.wdd.rpc.scheduler.beans; + +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; + +@Data +public class OctopusQuartzJob implements Serializable { + + public static final String JOB_KEY = "JOB_KEY"; + + @ApiModelProperty(value = "ID") + private Long id; + + @ApiModelProperty(value = "用于子任务唯一标识", hidden = true) + private String uuid; + + @ApiModelProperty(value = "任务名称") + private String jobName; + + @ApiModelProperty(value = "Bean名称") + private String beanName; + + @ApiModelProperty(value = "方法名称") + private String methodName; + + @ApiModelProperty(value = "参数") + private String params; + + @ApiModelProperty(value = "cron表达式") + private String cronExpression; + + @ApiModelProperty(value = "状态,暂时或启动") + private Boolean isPause = false; + + @ApiModelProperty(value = "子任务") + private String subTask; + + @ApiModelProperty(value = "失败后暂停") + private Boolean pauseAfterFailure; + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java new file mode 100644 index 0000000..55bb1a8 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/beans/OctopusQuartzLog.java @@ -0,0 +1,47 @@ +package io.wdd.rpc.scheduler.beans; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.io.Serializable; +import java.time.LocalDateTime; + +@Data +public class OctopusQuartzLog implements Serializable { + + @ApiModelProperty(value = "ID", hidden = true) + @TableId(type = IdType.ASSIGN_ID) + private Long id; + + @ApiModelProperty(value = "任务名称", hidden = true) + private String jobName; + + @ApiModelProperty(value = "bean名称", hidden = true) + private String beanName; + + @ApiModelProperty(value = "方法名称", hidden = true) + private String methodName; + + @ApiModelProperty(value = "参数", hidden = true) + private String params; + + @ApiModelProperty(value = "cron表达式", hidden = true) + private String cronExpression; + + @ApiModelProperty(value = "状态", hidden = true) + private Boolean isSuccess; + + @ApiModelProperty(value = "异常详情", hidden = true) + private String exceptionDetail; + + @ApiModelProperty(value = "执行耗时", hidden = true) + private Long time; + + @ApiModelProperty(value = "创建时间", hidden = true) + private LocalDateTime createTime; + + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java new file mode 100644 index 0000000..b93b84c --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/call/CallAgentQuartzService.java @@ -0,0 +1,18 @@ +package io.wdd.rpc.scheduler.call; + +import io.wdd.common.beans.status.OctopusStatusMessage; +import io.wdd.rpc.status.CollectAgentStatus; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +public class CallAgentQuartzService { + + @Resource + CollectAgentStatus collectAgentStatus; + + + + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java b/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java new file mode 100644 index 0000000..635f127 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/config/ExecutionJob.java @@ -0,0 +1,61 @@ +package io.wdd.rpc.scheduler.config; + +import io.wdd.common.handler.MyRuntimeException; +import io.wdd.rpc.scheduler.beans.OctopusQuartzJob; +import io.wdd.server.utils.SpringUtils; +import org.apache.commons.lang3.StringUtils; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; + + +@Async +public class ExecutionJob extends QuartzJobBean { + + @Override + protected void executeInternal(JobExecutionContext context) throws JobExecutionException { + + //通过JobExecutionContext对象得到OctopusQuartzJob实例。 + OctopusQuartzJob octopusQuartzJob = (OctopusQuartzJob) + context.getMergedJobDataMap().get( + io.wdd.rpc.scheduler.beans.OctopusQuartzJob.JOB_KEY); + + //反射获取到方法,并执行。 + runMethod(octopusQuartzJob.getBeanName(), octopusQuartzJob.getMethodName(), octopusQuartzJob.getParams()); + } + + /*** + * description:反射执行方法 + * + * @author: zeaslity + */ + public void runMethod(String beanName, String methodName, String params) { + Object target = SpringUtils.getBean(beanName); + Method method = null; + + try { + //执行的方法只能有两种,有String参数或者无参数,毕竟前端只能传字符串参数给后端。 + if (StringUtils.isNotBlank(params)) { + //反射获取到方法 两个参数 分别是方法名和参数类型 + method = target.getClass().getDeclaredMethod(methodName, String.class); + } else { + method = target.getClass().getDeclaredMethod(methodName); + } + + //反射执行方法 + ReflectionUtils.makeAccessible(method); + + if (StringUtils.isNotBlank(params)) { + method.invoke(target, params); + } else { + method.invoke(target); + } + } catch (Exception e) { + throw new MyRuntimeException("定时任务执行失败"); + } + } +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java new file mode 100644 index 0000000..4ec96f4 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/config/QuartzRunnable.java @@ -0,0 +1,49 @@ +package io.wdd.rpc.scheduler.config; + +import io.wdd.server.utils.SpringUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; +import java.util.concurrent.Callable; + +public class QuartzRunnable implements Callable { + + private final Object target; + private final Method method; + private final String params; + + QuartzRunnable(String beanName, String methodName, String params) throws NoSuchMethodException, ClassNotFoundException { + + //获取到bean对象 + this.target = SpringUtils.getBean(beanName); + //获取到参数 + this.params = params; + //如果参数不为空 + if (StringUtils.isNotBlank(params)) { + //反射获取到方法 两个参数 分别是方法名和参数类型 + this.method = target.getClass().getDeclaredMethod(methodName, String.class); + } else { + this.method = target.getClass().getDeclaredMethod(methodName); + } + } + + /*** + * description: 线程回调函数 反射执行方法 + * + * @author: lixiangxiang + */ + @Override + public Object call() throws Exception { + + ReflectionUtils.makeAccessible(method); + + if (StringUtils.isNotBlank(params)) { + method.invoke(target, params); + } else { + method.invoke(target); + } + return null; + } + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java index 01b765c..aafaa2d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzService.java @@ -1,5 +1,6 @@ package io.wdd.rpc.scheduler.service; +import io.wdd.rpc.scheduler.beans.OctopusQuartzJob; import org.springframework.scheduling.quartz.QuartzJobBean; import java.util.List; import java.util.Map; @@ -7,6 +8,8 @@ import java.util.Map; public interface OctopusQuartzService { + boolean addJob(OctopusQuartzJob quartzJob); + /** * 增加一个任务job * @param jobClass 任务job实现类 @@ -16,8 +19,7 @@ public interface OctopusQuartzService { * @param jobTimes 任务运行次数(若<0,则不限次数) * @param jobData 任务参数 */ - void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, - int jobTimes, Map jobData); + void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes, Map jobData); /** * 增加一个任务job @@ -37,6 +39,8 @@ public interface OctopusQuartzService { */ void updateJob(String jobName, String jobGroupName, String jobTime); + + /** * 删除一个任务job * @param jobName diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java index 18cac23..3611034 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/OctopusQuartzServiceImpl.java @@ -1,10 +1,12 @@ package io.wdd.rpc.scheduler.service; import io.wdd.common.handler.MyRuntimeException; +import io.wdd.rpc.scheduler.beans.OctopusQuartzJob; import lombok.extern.slf4j.Slf4j; import org.quartz.*; import org.quartz.DateBuilder.IntervalUnit; import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.impl.triggers.CronTriggerImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Service; @@ -12,6 +14,8 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; +import static org.quartz.TriggerBuilder.newTrigger; + /** * @author Andya * @date 2021/4/01 @@ -32,6 +36,39 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { } } + @Override + public boolean addJob(OctopusQuartzJob quartzJob) { + try { + // 构建jobDetail,并与PrintHelloJob类绑定(Job执行内容) + JobDetail jobDetail = JobBuilder + .newJob(PrintHelloJob.class) + .withIdentity(quartzJob.getUuid()) + .build(); + + //通过触发器名和cron表达式创建Trigger + Trigger cronTrigger = newTrigger() + .withIdentity(quartzJob.getUuid()) + .startNow() + .withSchedule(CronScheduleBuilder.cronSchedule(quartzJob.getCronExpression())) + .build(); + + //把job信息放入jobDataMap中 job_key为标识 + cronTrigger.getJobDataMap().put(OctopusQuartzJob.JOB_KEY, quartzJob); + + //重置启动时间 + ((CronTriggerImpl)cronTrigger).setStartTime(new Date()); + + //执行定时任务 + scheduler.scheduleJob(jobDetail, cronTrigger); + + } catch (Exception e) { + log.error("【创建定时任务失败】 定时任务id:{}", quartzJob.getId()); + return false; + } + + return true; + } + /** * 增加一个job * @@ -62,12 +99,11 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { // 使用simpleTrigger规则 Trigger trigger = null; if (jobTimes < 0) { - trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) + trigger = newTrigger().withIdentity(jobName, jobGroupName) .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)) .startNow().build(); } else { - trigger = TriggerBuilder - .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder + trigger = newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) .startNow().build(); } @@ -108,7 +144,7 @@ public class OctopusQuartzServiceImpl implements OctopusQuartzService { // 定义调度触发规则 // 使用cornTrigger规则 // 触发器key - Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) + Trigger trigger = newTrigger().withIdentity(jobName, jobGroupName) .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND)) .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build(); // 把作业和触发器注册到任务调度中 diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/PrintHelloJob.java b/server/src/main/java/io/wdd/rpc/scheduler/service/PrintHelloJob.java new file mode 100644 index 0000000..1e8e54a --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/PrintHelloJob.java @@ -0,0 +1,16 @@ +package io.wdd.rpc.scheduler.service; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +public class PrintHelloJob implements Job { + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + System.out.println(); + System.out.println("PrintHelloJob被执行了!"); + System.out.println("context = " + context); + System.out.println(); + } +} diff --git a/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java new file mode 100644 index 0000000..565b9be --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/CollectAgentStatus.java @@ -0,0 +1,59 @@ +package io.wdd.rpc.status; + +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.common.beans.status.OctopusStatusMessage; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.sender.ToAgentMessageSender; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 1. 定时任务 + * 2. 向RabbitMQ中发送消息,STATUS类型的消息 + * 3. 然后开始监听相应的Result StreamKey + */ +@Service +public class CollectAgentStatus { + + @Resource + ToAgentMessageSender toAgentMessageSender; + + + public void collectAgentStatus(OctopusStatusMessage statusMessage) { + + this.collectAgentStatusList(List.of(statusMessage)); + } + + + public void collectAgentStatusList(List statusMessageList) { + + // build all the OctopusMessage + List octopusMessageList = statusMessageList.stream().map( + statusMessage -> { + OctopusMessage octopusMessage = buildOctopusMessageStatus(statusMessage); + return octopusMessage; + } + ).collect(Collectors.toList()); + + // batch send all messages to RabbitMQ + toAgentMessageSender.send(octopusMessageList); + + // todo how to get result ? + + } + + private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) { + return OctopusMessage.builder() + .uuid(octopusStatusMessage.getAgentTopicName()) + .type(OctopusMessageType.STATUS) + .init_time(TimeUtils.currentTime()) + .content(octopusStatusMessage) + .build(); + } + + +} diff --git a/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java new file mode 100644 index 0000000..efa54db --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/MonitorAllAgentStatus.java @@ -0,0 +1,140 @@ +package io.wdd.rpc.status; + +import io.wdd.common.beans.status.OctopusStatusMessage; +import io.wdd.common.utils.TimeUtils; +import io.wdd.server.beans.vo.ServerInfoVO; +import io.wdd.server.coreService.CoreServerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * 获取所有注册的Agent + *

+ * 发送状态检查信息, agent需要update相应的HashMap的值 + * redis --> all-agent-health-map agent-topic-name : 1 + * todo 分布式问题,弱网环境,多线程操作同一个hashMap会不会出现冲突 + *

+ * 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报 + *

+ * 检查相应的 状态HashMap,然后全部置为零 + */ +@Service +@Slf4j +public class MonitorAllAgentStatus { + + private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; + private static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; + private static final String STATUS_MESSAGE_TYPE = "ping"; + + private HashMap AGENT_HEALTHY_INIT_MAP; + private List ALL_AGENT_TOPICNAME_LIST; + + @Resource + RedisTemplate redisTemplate; + @Resource + CollectAgentStatus collectAgentStatus; + @Resource + CoreServerService coreServerService; + + public void go() { + + try { + + // 1. 获取所有注册的Agent + // todo need to cache this + List allAgentInfo = coreServerService.serverGetAll(); + Assert.notEmpty(allAgentInfo,"not agent registered ! skip the agent healthy status check !"); + ALL_AGENT_TOPICNAME_LIST = allAgentInfo.stream().map(ServerInfoVO::getTopicName).collect(Collectors.toList()); + + // 1.1 检查 Agent状态保存数据结构是否正常 + checkOrCreateRedisHealthyKey(); + + // 2.发送状态检查信息, agent需要update相应的HashMap的值 + buildAndSendAgentHealthMessage(); + + // 3. 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报 + TimeUnit.SECONDS.sleep(MAX_WAIT_AGENT_REPORT_STATUS_TIME); + + // 4.检查相应的 状态HashMap,然后全部置为零 + // todo 存储到某个地方,目前只是打印日志 + updateAllAgentHealthyStatus(); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void checkOrCreateRedisHealthyKey() { + + if (!redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) { + log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create"); + + // build the redis all agent healthy map struct + HashMap initMap = new HashMap<>(32); + ALL_AGENT_TOPICNAME_LIST.stream().forEach( + agentTopicName -> { + initMap.put(agentTopicName, "0"); + } + ); + initMap.put("updateTime", TimeUtils.currentTimeString()); + + // cache this map struct + AGENT_HEALTHY_INIT_MAP = initMap; + + redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, initMap); + } + } + + private void buildAndSendAgentHealthMessage() { + List collect = ALL_AGENT_TOPICNAME_LIST.stream().map( + agentTopicName -> OctopusStatusMessage.builder() + .agentTopicName(agentTopicName) + .type(STATUS_MESSAGE_TYPE) + .build() + ).collect(Collectors.toList()); + collectAgentStatus.collectAgentStatusList(collect); + } + + private void updateAllAgentHealthyStatus() { + List statusList = redisTemplate.opsForHash().multiGet( + ALL_AGENT_STATUS_REDIS_KEY, + ALL_AGENT_TOPICNAME_LIST); + + HashMap tmp = new HashMap<>(32); + for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) { + tmp.put(ALL_AGENT_TOPICNAME_LIST.get(i), + uniformHealthyStatus(String.valueOf(statusList.get(i))) + ); + } + // current log to console is ok + + String currentTimeString = TimeUtils.currentTimeString(); + log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}", currentTimeString, tmp); + + // update time + AGENT_HEALTHY_INIT_MAP.put("updateTime", currentTimeString); + // init the healthy map + redisTemplate.opsForHash().putAll(ALL_AGENT_STATUS_REDIS_KEY, AGENT_HEALTHY_INIT_MAP); + } + + private String uniformHealthyStatus(String agentStatus) { + switch (agentStatus) { + case "0": + return "FAILED"; + case "1": + return "HEALTHY"; + default: + return "UNKNOWN"; + } + } + + +} diff --git a/server/src/main/java/io/wdd/server/utils/SpringUtils.java b/server/src/main/java/io/wdd/server/utils/SpringUtils.java new file mode 100644 index 0000000..4a0f514 --- /dev/null +++ b/server/src/main/java/io/wdd/server/utils/SpringUtils.java @@ -0,0 +1,143 @@ +package io.wdd.server.utils; + +import org.springframework.aop.framework.AopContext; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.stereotype.Component; + + +@Component +public class SpringUtils implements ApplicationContextAware, BeanFactoryPostProcessor { + + private static ApplicationContext applicationContext; + + private static ConfigurableListableBeanFactory beanFactory; + + + /** + * 获取对象 + * + * @param name + * @return Object 一个以所给名字注册的bean的实例 + * @throws org.springframework.beans.BeansException + */ + @SuppressWarnings("unchecked") + public static T getBean(String name) throws BeansException { + return (T) beanFactory.getBean(name); + } + + /** + * 获取类型为requiredType的对象 + * + * @param clz + * @return + * @throws org.springframework.beans.BeansException + */ + public static T getBean(Class clz) throws BeansException { + T result = beanFactory.getBean(clz); + return result; + } + + /** + * 获取类型为requiredType的对象 + * + * @param clz + * @return + * @throws org.springframework.beans.BeansException + */ + public static T getBean(String beanName, Class clz) throws BeansException { + T result = beanFactory.getBean(beanName, clz); + return result; + } + + /** + * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true + * + * @param name + * @return boolean + */ + public static boolean containsBean(String name) { + return beanFactory.containsBean(name); + } + + /** + * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) + * + * @param name + * @return boolean + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { + return beanFactory.isSingleton(name); + } + + /** + * @param name + * @return Class 注册对象的类型 + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static Class getType(String name) throws NoSuchBeanDefinitionException { + return beanFactory.getType(name); + } + + /** + * 如果给定的bean名字在bean定义中有别名,则返回这些别名 + * + * @param name + * @return + * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException + */ + public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { + return beanFactory.getAliases(name); + } + + /** + * 获取aop代理对象 + * + * @param invoker + * @return + */ + @SuppressWarnings("unchecked") + public static T getAopProxy(T invoker) { + return (T) AopContext.currentProxy(); + } + + /** + * 获取当前的环境配置,无配置返回null + * + * @return 当前的环境配置 + */ + public static String[] getActiveProfiles() { + return applicationContext.getEnvironment().getActiveProfiles(); + } + + public static void destroyBean(Object bean) { + beanFactory.destroyBean(bean); + } + +// /** +// * 获取当前的环境配置,当有多个环境配置时,只获取第一个 +// * +// * @return 当前的环境配置 +// */ +// public static String getActiveProfile() +// { +// final String[] activeProfiles = getActiveProfiles(); +// return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null; +// } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.applicationContext = applicationContext; + } + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { + SpringUtils.beanFactory = beanFactory; + } +}