diff --git a/.fastRequest/config/fastRequestCurrentProjectConfig.json b/.fastRequest/config/fastRequestCurrentProjectConfig.json index 7d39a50..b7c7f6f 100644 --- a/.fastRequest/config/fastRequestCurrentProjectConfig.json +++ b/.fastRequest/config/fastRequestCurrentProjectConfig.json @@ -8,6 +8,15 @@ } ], "name": "OctpusGO" + }, + { + "hostGroup": [ + { + "env": "local", + "url": "http://localhost:9999" + } + ], + "name": "server" } ], "envList": [ @@ -17,7 +26,8 @@ "postScript": "", "preScript": "", "projectList": [ - "OctpusGO" + "OctpusGO", + "server" ], "syncModel": { "branch": "master", diff --git a/agent-go/g/global.go b/agent-go/g/global.go index e070da7..b31ad7e 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -23,7 +23,14 @@ const ( BaseFuncOssUrlPrefix = "https://b2.107421.xyz/" ) -var pool, _ = ants.NewPool(100, ants.WithNonblocking(false), ants.WithLogger(logger2.Log), ants.WithMaxBlockingTasks(30), ants.WithDisablePurge(true)) +// 创建协程池子 +var pool, _ = ants.NewPool( + 100, + ants.WithNonblocking(false), + ants.WithLogger(logger2.Log), + ants.WithMaxBlockingTasks(30), + ants.WithDisablePurge(true), +) var G = NewGlobal( pool, diff --git a/agent-go/status/Network.go b/agent-go/status/Network.go index 69186d4..06b67a3 100644 --- a/agent-go/status/Network.go +++ b/agent-go/status/Network.go @@ -56,7 +56,6 @@ func GetNetworkStatus() (*NetworkStatus, error) { } // 休眠3秒 - time.Sleep(3 * time.Second) var sentAfter uint64 diff --git a/agent-go/status/Status.go b/agent-go/status/Status.go index 8ab5626..36f14ed 100644 --- a/agent-go/status/Status.go +++ b/agent-go/status/Status.go @@ -43,6 +43,7 @@ func Ping() string { return "PONG" } +// todo change to go model func ReportAppStatus() *AgentStatus { cpuStatus, cpuerr := GetCPUStatus() diff --git a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java index 730b595..d99fc97 100644 --- a/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/agent/OctopusAgentServiceImpl.java @@ -7,7 +7,7 @@ import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; +import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.config.ServerCommonPool; @@ -26,10 +26,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.LATEST_VERSION; import static io.wdd.rpc.message.handler.sync.OMessageHandlerServer.OCTOPUS_MESSAGE_FROM_AGENT; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @Service @Slf4j @@ -70,7 +70,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造 异步结果监听内容 - OctopusMessageAsyncReplayContend agentReplayContend = OctopusMessageAsyncReplayContend.build( + OctopusMessageSynScReplayContend agentReplayContend = OctopusMessageSynScReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime @@ -147,16 +147,16 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // 构造结果 - OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend.build( + OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend = OctopusMessageSynScReplayContend.build( ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(), CurrentAppOctopusMessageType, currentTime ); - CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = OctopusMessageSynScReplayContend.getCountDownLatch(); // 调用后台接收处理所有的Replay信息 - asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend); + asyncWaitOctopusMessageResultService.waitFor(OctopusMessageSynScReplayContend); /* CompletableFuture getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo( result, @@ -176,10 +176,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { // 超时,或者 全部信息已经收集 // 此处调用,即可中断 异步任务的收集工作 - asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageSynScReplayContend); // 处理结果 - OctopusMessageAsyncReplayContend + OctopusMessageSynScReplayContend .getReplayOMList() .stream() .forEach( @@ -207,7 +207,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService { ); // help gc - OctopusMessageAsyncReplayContend = null; + OctopusMessageSynScReplayContend = null; } return result; diff --git a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java index b768099..622050a 100644 --- a/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java +++ b/server/src/main/java/io/wdd/rpc/controller/ExecutionController.java @@ -17,8 +17,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; @RestController @RequestMapping("/octopus/server/executor") diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index 17f3c89..aaff09e 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -5,7 +5,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.wdd.common.response.R; import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; -import io.wdd.rpc.status.service.AsyncStatusService; +import io.wdd.rpc.status.service.SyncStatusService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -15,7 +15,7 @@ import javax.annotation.Resource; import java.util.List; import java.util.Map; -import static io.wdd.rpc.init.AgentStatusCacheService.*; +import static io.wdd.rpc.status.CommonAndStatusCache.*; @RestController @@ -24,7 +24,7 @@ import static io.wdd.rpc.init.AgentStatusCacheService.*; public class StatusController { @Resource - AsyncStatusService asyncStatusService; + SyncStatusService syncStatusService; @Resource AgentAliveStatusMonitorService agentAliveStatusMonitorService; @@ -80,7 +80,10 @@ public class StatusController { public R>> ManualUpdateAgentStatus() { // 手动调用更新 - Map agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus(ALL_AGENT_TOPIC_NAME_LIST, 5); + Map agentAliveStatusMap = syncStatusService.SyncCollectAgentAliveStatus( + ALL_AGENT_TOPIC_NAME_LIST, + 5 + ); agentAliveStatusMonitorService.updateAllAgentHealthyStatus(agentAliveStatusMap); diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index d7b55cb..9044bd2 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -3,7 +3,7 @@ package io.wdd.rpc.execute.service; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; +import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -205,7 +205,7 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } // 构造回复信息的内容 - OctopusMessageAsyncReplayContend executionReplayContent = OctopusMessageAsyncReplayContend.build( + OctopusMessageSynScReplayContend executionReplayContent = OctopusMessageSynScReplayContend.build( commandCount, CurrentAppOctopusMessageType, initTime diff --git a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java index 8b18c60..23ccbd4 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/SyncExecutionServiceImpl.java @@ -19,7 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_SET; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; @Service diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index 501e4b7..bb8241e 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -9,7 +9,7 @@ import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; import io.wdd.rpc.message.sender.OMessageToAgentSender; -import io.wdd.rpc.status.AgentStatus; +import io.wdd.rpc.status.deprecate.AgentStatus; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.utils.DaemonDatabaseOperator; import lombok.SneakyThrows; @@ -53,7 +53,9 @@ public class AcceptAgentInitInfo { "London", 7, "LosAngeles", - 7 + 7, + "Beijing", + 8 ) ); public static Set ALL_SERVER_ARCH_INFO = new HashSet<>( diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java index 5fdd822..1be1284 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/AsyncWaitOctopusMessageResultService.java @@ -25,29 +25,29 @@ public class AsyncWaitOctopusMessageResultService { /** * 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分 * KEY -> replayMatchKey - * VALUE -> OctopusMessageAsyncReplayContend - 包含countDownLatch 和 result + * VALUE -> OctopusMessageSynScReplayContend - 包含countDownLatch 和 result */ - private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); + private static final HashMap OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>(); - public void waitFor(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) { + public void waitFor(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) { // 向 REPLAY_CACHE_MAP中写入 Key OM_REPLAY_WAITING_TARGET_MAP.put( - OctopusMessageAsyncReplayContend.getReplayMatchKey(), - OctopusMessageAsyncReplayContend + OctopusMessageSynScReplayContend.getReplayMatchKey(), + OctopusMessageSynScReplayContend ); // 在调用线程的countDownLunch结束之后,关闭 // 清除 REPLAY_CACHE_MAP 中的队列 } - public void stopWaiting(OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend) { + public void stopWaiting(OctopusMessageSynScReplayContend OctopusMessageSynScReplayContend) { // 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列 - OctopusMessageAsyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageAsyncReplayContend.getReplayMatchKey()); + OctopusMessageSynScReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSynScReplayContend.getReplayMatchKey()); // 移除该内容 - OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageAsyncReplayContend.getReplayMatchKey()); + OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSynScReplayContend.getReplayMatchKey()); // help gc contend = null; @@ -88,7 +88,7 @@ public class AsyncWaitOctopusMessageResultService { OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll(); // 构造 replayMatchKey - String matchKey = OctopusMessageAsyncReplayContend.generateMatchKey( + String matchKey = OctopusMessageSynScReplayContend.generateMatchKey( replayOMessage.getType(), replayOMessage.getInit_time() ); @@ -105,7 +105,7 @@ public class AsyncWaitOctopusMessageResultService { } // Map中包含有Key,那么放置进去 - OctopusMessageAsyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); + OctopusMessageSynScReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey); replayContend .getReplayOMList() .add(replayOMessage); diff --git a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java similarity index 92% rename from server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java rename to server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java index 5864190..25ddc4e 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageAsyncReplayContend.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/async/OctopusMessageSynScReplayContend.java @@ -19,7 +19,7 @@ import java.util.concurrent.CountDownLatch; @NoArgsConstructor @SuperBuilder(toBuilder = true) @ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的") -public class OctopusMessageAsyncReplayContend { +public class OctopusMessageSynScReplayContend { @ApiModelProperty("rpc消息的类型") OctopusMessageType type; @@ -54,14 +54,14 @@ public class OctopusMessageAsyncReplayContend { * * @return */ - public static OctopusMessageAsyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { + public static OctopusMessageSynScReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) { CountDownLatch latch = null; if (waitForReplayNum != 0) { latch = new CountDownLatch(waitForReplayNum); } - return new OctopusMessageAsyncReplayContend( + return new OctopusMessageSynScReplayContend( currentOMType, currentTime, generateMatchKey( diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/ScheduleUpdateDBInfoJob.java b/server/src/main/java/io/wdd/rpc/scheduler/dto/ScheduleUpdateDBInfoJob.java similarity index 93% rename from server/src/main/java/io/wdd/rpc/scheduler/job/ScheduleUpdateDBInfoJob.java rename to server/src/main/java/io/wdd/rpc/scheduler/dto/ScheduleUpdateDBInfoJob.java index a3df0a7..13e4bc9 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/ScheduleUpdateDBInfoJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/dto/ScheduleUpdateDBInfoJob.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.scheduler.job; +package io.wdd.rpc.scheduler.dto; import org.quartz.JobExecutionContext; diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java index a72a37b..fba9ab6 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentAliveStatusMonitorJob.java @@ -23,7 +23,7 @@ public class AgentAliveStatusMonitorJob extends QuartzJobBean { //JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); // actually execute the monitor service - agentAliveStatusMonitorService.go(); + agentAliveStatusMonitorService.collectAllAgentAliveStatus(); // log to somewhere quartzLogOperator.save(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java new file mode 100644 index 0000000..1c89f42 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentMetricStatusJob.java @@ -0,0 +1,29 @@ +package io.wdd.rpc.scheduler.job; + +import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import javax.annotation.Resource; + + +public class AgentMetricStatusJob extends QuartzJobBean { + + @Resource + AgentMetricStatusCollectService agentMetricStatusCollectService; + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { + + // 从JobDetailContext中获取相应的信息 +// JobDataMap jobDataMap = jobExecutionContext +// .getJobDetail() +// .getJobDataMap(); + + // 执行Agent Metric 状态收集任务 + agentMetricStatusCollectService.collectHealthyAgentMetric(); + + } + +} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java deleted file mode 100644 index e34ad2b..0000000 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.wdd.rpc.scheduler.job; - -import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService; -import org.quartz.JobDataMap; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.springframework.scheduling.quartz.QuartzJobBean; - -import javax.annotation.Resource; - -import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT; -import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH; - -public class AgentRunMetricStatusJob extends QuartzJobBean { - - @Resource - AgentMetricStatusCollectService agentMetricStatusCollectService; - - @Override - protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { - - // 从JobDetailContext中获取相应的信息 - JobDataMap jobDataMap = jobExecutionContext - .getJobDetail() - .getJobDataMap(); - - // 执行Agent Metric 状态收集任务 - agentMetricStatusCollectService.collect( - (Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), - (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH) - ); - - // todo 机构设计状态会被存储至 Redis Stream Key 中 - // AgentTopicName-Metric - - } - -} diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentScriptSchedulerJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentScriptSchedulerJob.java index 3c2da55..ad4fe87 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentScriptSchedulerJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentScriptSchedulerJob.java @@ -18,6 +18,7 @@ import static io.wdd.rpc.scheduler.service.QuartzSchedulerServiceImpl.SCRIPT_SCH * 定时脚本任务核心类,Quartz框架定时调用该类 */ @Slf4j +@Deprecated public class AgentScriptSchedulerJob extends QuartzJobBean { @Resource diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index ae2353d..8977ac0 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -2,52 +2,39 @@ package io.wdd.rpc.scheduler.service; import io.wdd.rpc.scheduler.job.AgentAliveStatusMonitorJob; -import io.wdd.rpc.scheduler.job.AgentRunMetricStatusJob; +import io.wdd.rpc.scheduler.job.AgentMetricStatusJob; import lombok.extern.slf4j.Slf4j; -import org.quartz.CronExpression; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.text.ParseException; -import java.util.Date; -import java.util.HashMap; -import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT; -import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH; @Component @Slf4j public class BuildStatusScheduleTask { + public static final String JOB_GROUP_NAME = "OctopusAgent"; @Resource QuartzSchedulerService octopusQuartzService; - @Value(value = "${octopus.status.healthy.cron}") String healthyCronTimeExpress; - - // todo 此时间可以更新Nacos配置更新 自动进行任务更新 @Value(value = "${octopus.status.healthy.start-delay}") int healthyCheckStartDelaySeconds; - -// @Value(value = "${octopus.status.metric.cron}") -// int metricReportCronExpress; - - @Value(value = "${octopus.status.metric.pinch}") - int metricReportTimePinch; - - public static final String JOB_GROUP_NAME = "OctopusAgent"; + @Value(value = "${octopus.status.metric.cron}") + int metricReportCronExpress; + @Value(value = "${octopus.status.metric.start-delay}") + int metricReportStartDelaySeconds; @PostConstruct - private void buildAll() { + private void buildAllPreScheduledTask() { // Agent存活健康状态检查 buildMonitorAllAgentAliveStatusScheduleTask(); // Agent运行信息检查 Metric - - // Agent全部信息检查 All + buildAgentMetricScheduleTask(); } @@ -55,11 +42,23 @@ public class BuildStatusScheduleTask { * Agent运行信息检查 Metric * 【调用】应该由 健康状态检查结果 调用 ==> 所有存活节点需要进行Metric信息汇报 * 【间隔】存活间隔内,间隔一定的时间汇报Metric + *

+ * 2023年7月10日 更改为按照cron表达式进行执行 */ public void buildAgentMetricScheduleTask() { + // 2023年7月10日 更改为按照cron表达式进行执行 + octopusQuartzService.addMission( + AgentMetricStatusJob.class, + "agentRunMetricStatusJob", + JOB_GROUP_NAME, + metricReportStartDelaySeconds, + metricReportCronExpress, + null + ); + // 计算 Metric检测的时间间隔 - int metricReportTimesCount = 19; + /*int metricReportTimesCount = 19; try { CronExpression cronExpression = new CronExpression(healthyCronTimeExpress); @@ -68,8 +67,8 @@ public class BuildStatusScheduleTask { long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000; metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1; - /*System.out.println("totalSeconds = " + totalSeconds); - System.out.println("metricReportTimesCount = " + metricReportTimesCount);*/ + *//*System.out.println("totalSeconds = " + totalSeconds); + System.out.println("metricReportTimesCount = " + metricReportTimesCount);*//* } catch (ParseException e) { throw new RuntimeException(e); @@ -77,18 +76,10 @@ public class BuildStatusScheduleTask { HashMap map = new HashMap(); map.put(METRIC_REPORT_TIME_PINCH, metricReportTimePinch); - map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount); + map.put(METRIC_REPORT_TIMES_COUNT, metricReportTimesCount);*/ + + // - // build the Job 只发送一次消息,然后让Agent获取消息 (重复间隔,重复次数) 进行相应的处理! - // todo 解决创建太多对象的问题,需要缓存相应的内容 - octopusQuartzService.addMission( - AgentRunMetricStatusJob.class, - "agentRunMetricStatusJob", - JOB_GROUP_NAME, - metricReportTimePinch, - 1, - map - ); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java index 48682bb..4e084ac 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/script/AgentApplyScheduledScript.java @@ -17,6 +17,7 @@ import java.util.List; */ @Service @Slf4j +@Deprecated public class AgentApplyScheduledScript { @Resource diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java index fa423df..9c768cc 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java @@ -1,9 +1,8 @@ package io.wdd.rpc.scheduler.service.status; import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.init.AgentStatusCacheService; -import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; -import io.wdd.rpc.status.service.AsyncStatusService; +import io.wdd.rpc.status.CommonAndStatusCache; +import io.wdd.rpc.status.service.SyncStatusService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.context.annotation.Lazy; @@ -14,47 +13,36 @@ import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; +import static io.wdd.rpc.status.CommonAndStatusCache.*; import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; /** - * 更新频率被类 BuildStatusScheduleTask.class控制 - *

- *

- * 获取所有注册的Agent - *

- * 发送状态检查信息, agent需要update相应的HashMap的值 - * redis --> all-agent-health-map agent-topic-name : 1 - * todo 分布式问题,弱网环境,多线程操作同一个hashMap会不会出现冲突 - *

- * 休眠 MAX_WAIT_AGENT_REPORT_STATUS_TIME 秒 等待agent的状态上报 - *

- * 检查相应的 状态HashMap,然后全部置为零 + * 定时任务 检测所有的Agent的 存活状态 的实际执行类 */ @Service @Slf4j @Lazy public class AgentAliveStatusMonitorService { - private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource RedisTemplate redisTemplate; @Resource - AgentStatusCacheService agentStatusCacheService; + CommonAndStatusCache commonAndStatusCache; @Resource - BuildStatusScheduleTask buildStatusScheduleTask; - - @Resource - AsyncStatusService asyncStatusService; + SyncStatusService syncStatusService; private HashMap AGENT_HEALTHY_INIT_MAP; - public void go() { + /** + * 收集所有Agent的存活状态 + * 实际的定时任务的执行类, + */ + public void collectAllAgentAliveStatus() { // 1. 获取所有注册的Agent 手动更新 - agentStatusCacheService.updateAllAgentTopicNameCache(); + commonAndStatusCache.updateAllAgentTopicNameCache(); if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { log.warn("[Scheduler] No Agent Registered ! End Up Status Monitor !"); return; @@ -63,17 +51,16 @@ public class AgentAliveStatusMonitorService { // 1.1 检查 Agent状态保存数据结构是否正常 checkOrCreateRedisHealthyKey(); - // 2.发送状态检查信息, agent需要update相应的HashMap的值 // 2023年6月14日 2. 发送ping等待所有的Agent返回PONG, 然后进行redis的状态修改 - - // 使用同步更新的策略 - Map agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus( + // 同步的方法, 超时等待所有主机的存活状态 + Map agentAliveStatusMap = syncStatusService.SyncCollectAgentAliveStatus( ALL_AGENT_TOPIC_NAME_LIST, 5 ); - // 更新Agent的状态 - updateAllAgentHealthyStatus(agentAliveStatusMap); + // 更新Agent的状态 一级和二级缓存 同时更新 write-through的方式 + updateAllAgentStatusCache(agentAliveStatusMap); + } /** @@ -112,21 +99,18 @@ public class AgentAliveStatusMonitorService { .opsForHash() .put( ALL_AGENT_STATUS_REDIS_KEY, - "initTime", + STATUS_INIT_TIME_KEY, TimeUtils.currentTimeString() ); } - public void updateAllAgentHealthyStatus(Map agentAliveStatusMap) { + public void updateAllAgentStatusCache(Map agentAliveStatusMap) { String currentTimeString = TimeUtils.currentTimeString(); - // 更新所有的缓存状态 - agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap); - - // 执行Metric上报定时任务 -// buildStatusScheduleTask.buildAgentMetricScheduleTask(); + // 更新 二级缓存 + commonAndStatusCache.updateAgentStatusCache(agentAliveStatusMap); log.debug( "[存活状态] - 当前时间为 [ %s ] , 所有的Agent存活状态为=> %s", @@ -134,12 +118,13 @@ public class AgentAliveStatusMonitorService { agentAliveStatusMap ); + // 更新 一级缓存 // 这里仅仅是更新时间 redisTemplate .opsForHash() .put( ALL_AGENT_STATUS_REDIS_KEY, - "updateTime", + STATUS_UPDATE_TIME_KEY, currentTimeString ); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java index bcaec69..191b810 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java @@ -1,74 +1,55 @@ package io.wdd.rpc.scheduler.service.status; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.rpc.status.beans.AgentStatus; +import io.wdd.rpc.status.service.SyncStatusService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.status.OctopusStatusMessage.ConstructAgentStatusMessage; -import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; +import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; /** - * 收集OctopusAgent的运行Metric信息 - *

- * CPU Memory AppStatus易变信息 + * 定时任务 收集Agent的运行Metric的实际执行类 */ @Service @Slf4j public class AgentMetricStatusCollectService { - public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch"; - public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount"; - - @Resource OMessageToAgentSender oMessageToAgentSender; @Resource - ObjectMapper objectMapper; + SyncStatusService syncStatusService; - public void collect(int metricRepeatCount, int metricRepeatPinch) { + /** + * 收集所有健康主机的运行数据 + */ + public void collectHealthyAgentMetric() { - // 检查基础信息 + // 检查是否存在健康的主机 if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) { log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES"); + return; } - buildMetricStatusMessageAndSend( - metricRepeatCount, - metricRepeatPinch + // 调用核心的服务 + Map agentMetricStatusMap = syncStatusService.SyncCollectAgentMetricStatus( + ALL_HEALTHY_AGENT_TOPIC_NAME_LIST, + 10 ); - } + // todo 需要进行存储或者咋滴 + log.info( + "[Agent Metric] - 所有主机的状态为 => %s", + agentMetricStatusMap + ); - private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) { - - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - - List octopusStatusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST - .stream() - .map( - agentTopicName -> ConstructAgentStatusMessage( - METRIC_STATUS_MESSAGE_TYPE, - agentTopicName, - currentTime - ) - ) - .collect(Collectors.toList()); - - // batch send all messages to RabbitMQ - oMessageToAgentSender.send(octopusStatusMessageList); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentStatusStreamReader.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentStatusStreamReader.java index f0edfa4..ab638ef 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentStatusStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentStatusStreamReader.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.rpc.status.AgentStatus; +import io.wdd.rpc.status.deprecate.AgentStatus; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -15,6 +15,7 @@ import org.springframework.data.redis.stream.StreamListener; @Getter @Setter @Slf4j +@Deprecated public class AgentStatusStreamReader implements StreamListener> { // https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde diff --git a/server/src/main/java/io/wdd/rpc/scheduler/定时任务说明.md b/server/src/main/java/io/wdd/rpc/scheduler/定时任务说明.md new file mode 100644 index 0000000..215979f --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/scheduler/定时任务说明.md @@ -0,0 +1,17 @@ +# 定时框架Quartz的说明 + +# 核心为 QuartzSchedulerService + +1. addMission()方法 +2. Scheduler.scheduleJob()方法就可以设置一个定时任务 + +# 项目创建 固定-定时任务的入口为 BuildStatusScheduleTask + +1. 创建的方法为 buildAllPreScheduledTask() + +# 需要将定时任务包装为一个个的Job + +1. 需要继承 QuartzJobBean +2. 然后调用实际定时任务的Service + 1. AgentAliveStatusMonitorJob extends QuartzJobBean + 2. \ No newline at end of file diff --git a/server/src/main/java/io/wdd/rpc/status/AgentHealthyStatusEnum.java b/server/src/main/java/io/wdd/rpc/status/AgentHealthyStatusEnum.java index ef794e7..88e0b1a 100644 --- a/server/src/main/java/io/wdd/rpc/status/AgentHealthyStatusEnum.java +++ b/server/src/main/java/io/wdd/rpc/status/AgentHealthyStatusEnum.java @@ -26,6 +26,4 @@ public enum AgentHealthyStatusEnum { } - - } diff --git a/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java b/server/src/main/java/io/wdd/rpc/status/CommonAndStatusCache.java similarity index 78% rename from server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java rename to server/src/main/java/io/wdd/rpc/status/CommonAndStatusCache.java index c4e5b09..8aa0b9c 100644 --- a/server/src/main/java/io/wdd/rpc/init/AgentStatusCacheService.java +++ b/server/src/main/java/io/wdd/rpc/status/CommonAndStatusCache.java @@ -1,12 +1,12 @@ -package io.wdd.rpc.init; +package io.wdd.rpc.status; import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.status.AgentHealthyStatusEnum; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.coreService.CoreServerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -15,17 +15,19 @@ import java.util.*; import java.util.stream.Collectors; - /** * Server启动或者运行的时候,需要初 缓存一系列的信息 *

* 所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET *

+ * 2023年7月10日 此部分应该初始化全部为 False状态 * Agent状态信息的两个Map STATUS_AGENT_LIST_MAP ALL_AGENT_STATUS_MAP + *

+ * 2023年7月10日 -- 此部分作为Redis中存储的 二级缓存部分 应该严格遵循 */ @Service @Slf4j -public class AgentStatusCacheService { +public class CommonAndStatusCache { /** * 存储所有的AgentTopicName的缓存 @@ -36,8 +38,6 @@ public class AgentStatusCacheService { * 存储所有的AgentTopicName的缓存 */ public static final List ALL_AGENT_TOPIC_NAME_LIST = new ArrayList<>(); - - /** * 存储 状态对应Agent列表的Map * Agent的状态描述为 AgentHealthyStatusEnum @@ -51,34 +51,52 @@ public class AgentStatusCacheService { * 内容为 agentTopicName- True代表健康 False代表不健康 */ public static final Map ALL_AGENT_STATUS_MAP = new HashMap<>(); - /** * 保存所有健康运行的Agent Topic Name */ public static final List ALL_HEALTHY_AGENT_TOPIC_NAME_LIST = new ArrayList<>(); - /** * 记录状态信息缓存的更新时间 */ - private static final String STATUS_UPDATE_TIME_KEY = "UPDATE_TIME"; + public static final String STATUS_UPDATE_TIME_KEY = "UPDATE_TIME"; + /** + * 记录状态信息缓存的初始化时间 + */ + public static final String STATUS_INIT_TIME_KEY = "INIT_TIME"; + /** + * AgentTopicName 在Redis中緩存的Key + */ + private static final String ALL_AGENT_TOPIC_NAME_REDIS_KEY = "ALL_AGENT_TOPIC_NAME"; @Resource CoreServerService coreServerService; + @Resource + RedisTemplate redisTemplate; @PostConstruct - public void GenerateAllCache() { + public void InitToGenerateAllStatusCache() { //所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET updateAllAgentTopicNameCache(); // Agent状态信息的两个Map - // updateAgentStatusMapCache(agentAliveStatusMap); + // 初始化 默认创建全部失败的Map + Map initAgentFalseStatusMap = ALL_AGENT_TOPIC_NAME_LIST + .stream() + .collect(Collectors.toMap( + topicName -> topicName, + topicName -> Boolean.FALSE + )); + + updateAgentStatusCache(initAgentFalseStatusMap); } /** * 从数据库中获取所有注册过的Agent名称 + *

+ * 2023年7月10日 写入Redis中保存一份 */ public void updateAllAgentTopicNameCache() { @@ -98,23 +116,28 @@ public class AgentStatusCacheService { .map(ServerInfoVO::getTopicName) .collect(Collectors.toList()); - ALL_AGENT_TOPIC_NAME_LIST.addAll(collect); ALL_AGENT_TOPIC_NAME_SET.addAll(collect); + // 2023年7月10日 同步缓存至Redis中 + redisTemplate + .opsForSet() + .add( + ALL_AGENT_TOPIC_NAME_REDIS_KEY, + ALL_AGENT_TOPIC_NAME_SET + ); + } /** - * 从redis中获取信息,更新Agent状态信息的全局缓存 + * 根据传入的状态Map更新二级缓存的两个状态Map和健康主机的列表 * ALL_AGENT_STATUS_MAP - * ALL_HEALTHY_AGENT_TOPIC_NAME_LIST * STATUS_AGENT_LIST_MAP *

- * 由定时任务或者初始化服务触发 - * 2023-02-21 前端接口,手动更新 + * ALL_HEALTHY_AGENT_TOPIC_NAME_LIST */ - public void updateAgentStatusMapCache(Map agentAliveStatusMap) { + public void updateAgentStatusCache(Map agentAliveStatusMap) { // 检查,排除没有节点的情况 if (CollectionUtils.isEmpty(ALL_AGENT_TOPIC_NAME_LIST)) { @@ -122,12 +145,10 @@ public class AgentStatusCacheService { return; } - // 2023年6月15日 更新状态缓存 ALL_AGENT_STATUS_MAP.clear(); ALL_AGENT_STATUS_MAP.putAll(agentAliveStatusMap); - // 2023-01-16 // 更新 状态-Agent容器 内容为 // HEALTHY -> ["agentTopicName-1", "agentTopicName-2"] diff --git a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java index e6e1a14..70e9d8f 100644 --- a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java +++ b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java @@ -34,9 +34,6 @@ public class OctopusStatusMessage { */ String statusType; - int metricRepeatCount; - - int metricRepeatPinch; public static OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) { diff --git a/server/src/main/java/io/wdd/rpc/status/AgentStatus.java b/server/src/main/java/io/wdd/rpc/status/deprecate/AgentStatus.java similarity index 95% rename from server/src/main/java/io/wdd/rpc/status/AgentStatus.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/AgentStatus.java index 83a6a90..98e741f 100644 --- a/server/src/main/java/io/wdd/rpc/status/AgentStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/AgentStatus.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import lombok.AllArgsConstructor; diff --git a/server/src/main/java/io/wdd/rpc/status/AgentSystemInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/AgentSystemInfo.java similarity index 96% rename from server/src/main/java/io/wdd/rpc/status/AgentSystemInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/AgentSystemInfo.java index 89f5abc..a5b734d 100644 --- a/server/src/main/java/io/wdd/rpc/status/AgentSystemInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/AgentSystemInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import io.wdd.common.utils.TimeUtils; import lombok.AllArgsConstructor; diff --git a/server/src/main/java/io/wdd/rpc/status/AppStatusEnum.java b/server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusEnum.java similarity index 92% rename from server/src/main/java/io/wdd/rpc/status/AppStatusEnum.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusEnum.java index e627994..bba99f1 100644 --- a/server/src/main/java/io/wdd/rpc/status/AppStatusEnum.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusEnum.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; public enum AppStatusEnum { diff --git a/server/src/main/java/io/wdd/rpc/status/AppStatusInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusInfo.java similarity index 90% rename from server/src/main/java/io/wdd/rpc/status/AppStatusInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusInfo.java index fd3abb0..fd1b359 100644 --- a/server/src/main/java/io/wdd/rpc/status/AppStatusInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/AppStatusInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/server/src/main/java/io/wdd/rpc/status/CpuInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/CpuInfo.java similarity index 99% rename from server/src/main/java/io/wdd/rpc/status/CpuInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/CpuInfo.java index 43cb652..22a2715 100644 --- a/server/src/main/java/io/wdd/rpc/status/CpuInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/CpuInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/server/src/main/java/io/wdd/rpc/status/CpuTicks.java b/server/src/main/java/io/wdd/rpc/status/deprecate/CpuTicks.java similarity index 98% rename from server/src/main/java/io/wdd/rpc/status/CpuTicks.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/CpuTicks.java index 2065ee7..2c55ab2 100644 --- a/server/src/main/java/io/wdd/rpc/status/CpuTicks.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/CpuTicks.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/server/src/main/java/io/wdd/rpc/status/DiskInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/DiskInfo.java similarity index 98% rename from server/src/main/java/io/wdd/rpc/status/DiskInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/DiskInfo.java index 291617a..6acf1c9 100644 --- a/server/src/main/java/io/wdd/rpc/status/DiskInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/DiskInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import io.wdd.common.utils.FormatUtils; import lombok.AllArgsConstructor; diff --git a/server/src/main/java/io/wdd/rpc/status/MemoryInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/MemoryInfo.java similarity index 97% rename from server/src/main/java/io/wdd/rpc/status/MemoryInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/MemoryInfo.java index 1afe27b..e0e8963 100644 --- a/server/src/main/java/io/wdd/rpc/status/MemoryInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/MemoryInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import io.wdd.common.utils.FormatUtils; import lombok.AllArgsConstructor; diff --git a/server/src/main/java/io/wdd/rpc/status/MetricStatus.java b/server/src/main/java/io/wdd/rpc/status/deprecate/MetricStatus.java similarity index 85% rename from server/src/main/java/io/wdd/rpc/status/MetricStatus.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/MetricStatus.java index 2f7bae5..df0c04d 100644 --- a/server/src/main/java/io/wdd/rpc/status/MetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/MetricStatus.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import lombok.Data; diff --git a/server/src/main/java/io/wdd/rpc/status/NetworkInfo.java b/server/src/main/java/io/wdd/rpc/status/deprecate/NetworkInfo.java similarity index 98% rename from server/src/main/java/io/wdd/rpc/status/NetworkInfo.java rename to server/src/main/java/io/wdd/rpc/status/deprecate/NetworkInfo.java index d540e28..f82d7aa 100644 --- a/server/src/main/java/io/wdd/rpc/status/NetworkInfo.java +++ b/server/src/main/java/io/wdd/rpc/status/deprecate/NetworkInfo.java @@ -1,4 +1,4 @@ -package io.wdd.rpc.status; +package io.wdd.rpc.status.deprecate; import io.wdd.common.utils.FormatUtils; diff --git a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java deleted file mode 100644 index 844035b..0000000 --- a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusService.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.wdd.rpc.status.service; - -import java.util.List; -import java.util.Map; - -public interface AsyncStatusService { - - /** - * 应该是同步收集 agentTopicNameList 的节点的存活状态,并返回所有的状态存活结果 - * - * @param agentTopicNameList - * @param aliveStatusWaitMaxTime - * @return - */ - Map AsyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime); -} diff --git a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java deleted file mode 100644 index a9cb661..0000000 --- a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java +++ /dev/null @@ -1,119 +0,0 @@ -package io.wdd.rpc.status.service; - -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; -import io.wdd.rpc.message.handler.async.OctopusMessageAsyncReplayContend; -import io.wdd.rpc.message.sender.OMessageToAgentSender; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; -import static io.wdd.rpc.status.OctopusStatusMessage.ConstructAgentStatusMessage; -import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; - -@Slf4j -@Service -public class AsyncStatusServiceImpl implements AsyncStatusService { - - private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; - - @Resource - OMessageToAgentSender oMessageToAgentSender; - - @Resource - AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; - - @Override - public Map AsyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime) { - - // 构造最后的结果Map - Map agentAliveStatusMap = agentTopicNameList - .stream() - .collect( - Collectors.toMap( - agentTopicName -> agentTopicName, - agentTopicName -> Boolean.FALSE - )); - - LocalDateTime currentTime = TimeUtils.currentFormatTime(); - // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息 - buildAndSendAgentAliveOctopusMessage(currentTime); - - // 异步收集消息 - OctopusMessageAsyncReplayContend statusAsyncReplayContend = OctopusMessageAsyncReplayContend.build( - agentTopicNameList.size(), - CurrentAppOctopusMessageType, - currentTime - ); - asyncWaitOctopusMessageResultService.waitFor(statusAsyncReplayContend); - - // 解析结果 - CountDownLatch countDownLatch = statusAsyncReplayContend.getCountDownLatch(); - - // 等待状态返回的结果 - boolean agentAliveStatusCollectResult = false; - try { - agentAliveStatusCollectResult = countDownLatch.await( - aliveStatusWaitMaxTime, - TimeUnit.SECONDS - ); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - if (!agentAliveStatusCollectResult) { - log.debug("Agent存活状态检查,没有检查到全部的Agent!"); - } - - // 移除等待队列 - asyncWaitOctopusMessageResultService.stopWaiting(statusAsyncReplayContend); - - // 处理结果 - statusAsyncReplayContend - .getReplayOMList() - .stream() - .forEach( - statusOMessage -> { - if (statusOMessage.getResult() != null) { - agentAliveStatusMap.put( - statusOMessage.getUuid(), - Boolean.TRUE - ); - } - } - ); - } - - // 返回Agent的存活状态内容 - return agentAliveStatusMap; - } - - private void buildAndSendAgentAliveOctopusMessage(LocalDateTime currentTime) { - - List octopusStatusMessageList = ALL_AGENT_TOPIC_NAME_LIST - .stream() - .map( - agentTopicName -> ConstructAgentStatusMessage( - HEALTHY_STATUS_MESSAGE_TYPE, - agentTopicName, - currentTime - ) - ) - .collect(Collectors.toList()); - - // 发送信息 - oMessageToAgentSender.send(octopusStatusMessageList); - - } - - -} diff --git a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusService.java b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusService.java new file mode 100644 index 0000000..ab9d69b --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusService.java @@ -0,0 +1,27 @@ +package io.wdd.rpc.status.service; + +import io.wdd.rpc.status.beans.AgentStatus; + +import java.util.List; +import java.util.Map; + +public interface SyncStatusService { + + /** + * 同步收集 agentTopicNameList 的节点的存活状态,并返回所有的状态存活结果 + * + * @param agentTopicNameList + * @param aliveStatusWaitMaxTime + * @return + */ + Map SyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime); + + /** + * 同步收集 节点的运行状态 + * + * @param agentTopicNameList + * @param collectMetricWaitMaxTime + * @return + */ + Map SyncCollectAgentMetricStatus(List agentTopicNameList, int collectMetricWaitMaxTime); +} diff --git a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java new file mode 100644 index 0000000..6079932 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java @@ -0,0 +1,214 @@ +package io.wdd.rpc.status.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.OctopusMessageType; +import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService; +import io.wdd.rpc.message.handler.async.OctopusMessageSynScReplayContend; +import io.wdd.rpc.message.sender.OMessageToAgentSender; +import io.wdd.rpc.status.beans.AgentStatus; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.rpc.status.OctopusStatusMessage.*; + +@Slf4j +@Service +public class SyncStatusServiceImpl implements SyncStatusService { + + private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS; + + @Resource + OMessageToAgentSender oMessageToAgentSender; + + @Resource + AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; + + @Override + public Map SyncCollectAgentAliveStatus(List agentTopicNameList, int aliveStatusWaitMaxTime) { + + // 构造最后的结果Map + Map agentAliveStatusMap = agentTopicNameList + .stream() + .collect( + Collectors.toMap( + agentTopicName -> agentTopicName, + agentTopicName -> Boolean.FALSE + )); + + // 当前的时间 + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + + // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息 + buildAndSendAgentStatusOctopusMessage( + agentTopicNameList, + HEALTHY_STATUS_MESSAGE_TYPE, + currentTime + ); + + // 同步收集消息 + OctopusMessageSynScReplayContend statusSyncReplayContend = OctopusMessageSynScReplayContend.build( + agentTopicNameList.size(), + CurrentAppOctopusMessageType, + currentTime + ); + asyncWaitOctopusMessageResultService.waitFor(statusSyncReplayContend); + + // 解析结果 + CountDownLatch countDownLatch = statusSyncReplayContend.getCountDownLatch(); + + // 等待状态返回的结果 + boolean agentAliveStatusCollectResult = false; + try { + agentAliveStatusCollectResult = countDownLatch.await( + aliveStatusWaitMaxTime, + TimeUnit.SECONDS + ); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (!agentAliveStatusCollectResult) { + log.debug("Agent存活状态检查,没有检查到全部的Agent!"); + } + + // 移除等待队列 + asyncWaitOctopusMessageResultService.stopWaiting(statusSyncReplayContend); + + // 处理结果 + statusSyncReplayContend + .getReplayOMList() + .stream() + .forEach( + statusOMessage -> { + if (statusOMessage.getResult() != null) { + agentAliveStatusMap.put( + statusOMessage.getUuid(), + Boolean.TRUE + ); + } + } + ); + } + + // 返回Agent的存活状态内容 + return agentAliveStatusMap; + } + + @Override + public Map SyncCollectAgentMetricStatus(List agentTopicNameList, int collectMetricWaitMaxTime) { + + // 状态的结果Map + HashMap metricMap = new HashMap<>(); + + // 当前的时间 + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + + // 构造所有的Metric的OM并且下发 + buildAndSendAgentStatusOctopusMessage( + agentTopicNameList, + METRIC_STATUS_MESSAGE_TYPE, + currentTime + ); + + // 同步等待结果, 并且解析结果 + OctopusMessageSynScReplayContend metricSyncReplayContend = OctopusMessageSynScReplayContend.build( + agentTopicNameList.size(), + CurrentAppOctopusMessageType, + currentTime + ); + asyncWaitOctopusMessageResultService.waitFor(metricSyncReplayContend); + + // 解析结果 + CountDownLatch countDownLatch = metricSyncReplayContend.getCountDownLatch(); + + // 等待状态返回的结果 + boolean agentAliveStatusCollectResult = false; + try { + agentAliveStatusCollectResult = countDownLatch.await( + collectMetricWaitMaxTime, + TimeUnit.SECONDS + ); + } catch (InterruptedException e) { + log.error("[Agent Metric] - 收集Agent的运行状态失败!"); + throw new RuntimeException(e); + } finally { + if (!agentAliveStatusCollectResult) { + log.debug("Agent存活状态检查,没有检查到全部的Agent!"); + } + + // 移除等待队列 + asyncWaitOctopusMessageResultService.stopWaiting(metricSyncReplayContend); + + // 处理结果 + metricSyncReplayContend + .getReplayOMList() + .stream() + .forEach( + statusOMessage -> { + if (statusOMessage.getResult() != null) { + + // 解析Result对象为 AgentStatus + try { + + AgentStatus agentStatus = OctopusObjectMapper.readValue( + (String) statusOMessage.getResult(), + AgentStatus.class + ); + + // 保存结果 + metricMap.put( + statusOMessage.getUuid(), + agentStatus + ); + + } catch (JsonProcessingException e) { + log.error("[Agent Metric] - 解析AgentStatus失败!"); + throw new RuntimeException(e); + } + + } + } + ); + } + + return metricMap; + } + + + /** + * 2023年7月10日 通用的底层构造方法 Status类型的Octopus Message + * + * @param agentTopicNameList + * @param statusType + * @param currentTime + */ + private void buildAndSendAgentStatusOctopusMessage(List agentTopicNameList, String statusType, LocalDateTime currentTime) { + + List octopusStatusMessageList = agentTopicNameList + .stream() + .map( + agentTopicName -> ConstructAgentStatusMessage( + statusType, + agentTopicName, + currentTime + ) + ) + .collect(Collectors.toList()); + + // 发送信息 + oMessageToAgentSender.send(octopusStatusMessageList); + } + + +} diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index 2954356..750ef92 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -121,7 +121,9 @@ octopus: cron: 10 * * * * ? * start-delay: 30 metric: - pinch: 20 + type: cron + cron: 30 * * * * ? * + start-delay: 40 oss: # 这里只是因为需要一个层级,不一定下面的都是oracle diff --git a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh index 19d274d..aa592ec 100644 --- a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh +++ b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh @@ -257,47 +257,42 @@ GetIpv4Info() { country="$(wget -q -T10 -O- ipinfo.io/country)" region="$(wget -q -T10 -O- ipinfo.io/region)" public_ipv4="$(wget -q -T10 -O- ipinfo.io/ip)" - public_ipv6="$(curl --max-time 5 -6 https://ifconfig.co/ip)" + public_ipv6="$(curl -q --max-time 5 -6 https://ifconfig.co/ip)" if [ -z "$public_ipv4" ] ; then - public_ipv4="" + public_ipv4=" " fi if [ -z "$public_ipv6" ] ; then - public_ipv6="" + public_ipv6=" " fi - if [ -z "$public_ipv4" ] ; then - public_ipv4="" - fi - - # inner ipinfo - interface_prefix=(" eth[0-9]{1,2}" " ens[0-9]{1,3}" " eno[0-9]{1,3}" " enp[0-9]{1,2}") - real_interface="eth90" + export interface_prefix=("[[:space:]]eth[0-9]{1,2}" "[[:space:]]ens[0-9]{1,3}" "[[:space:]]eno[0-9]{1,3}" "[[:space:]]enp[0-9]{1,2}") + export real_interface="eth90" for interface in "${interface_prefix[@]}"; do - echo $(ip link show) | grep -oE ${interface} + echo $(ip link show) | grep -oE ${interface} | head -1 if [[ $? -eq 0 ]]; then - real_interface=$(echo $(ip link show) | grep -oE ${interface} | cut -d" " -f2) - echo "当前主机的真实内网网卡为 => $real_interface" - return + real_interface=$(echo $(ip link show) | grep -oE ${interface} | head -1 | cut -d" " -f2) + echo "当前主机的真实内网网卡为 => [$real_interface]" + break fi done # 提取IPv4地址(CIDR格式) - ipv4_regex="inet (25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\/[0-9]{1,2}" + ipv4_regex="inet[[:space:]](25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\/[0-9]{1,2}" # 提取IPv6地址(CIDR格式) - ipv6_regex="inet6 ([0-9a-fA-F]{0,4}(:[0-9a-fA-F]{0,4}){1,7})\/[0-9]{1,3}" + ipv6_regex="inet6[[:space:]]([0-9a-fA-F]{0,4}(:[0-9a-fA-F]{0,4}){1,7})\/[0-9]{1,3}" # 查找IPv4地址 inner_ipv4=$(echo $(ip addr show $real_interface) | grep -oE $ipv4_regex | cut -d" " -f2) - echo "Interface: $interface, IPv4 Address: $inner_ipv4" + echo "Interface: $real_interface, IPv4 Address: $inner_ipv4" # 查找IPv6地址 inner_ipv6=$(echo $(ip addr show $real_interface) | grep -oE $ipv6_regex | cut -d" " -f2) - echo "Interface: $interface, IPv4 Address: $inner_ipv6" + echo "Interface: $real_interface, IPv4 Address: $inner_ipv6" } @@ -379,7 +374,7 @@ if [[ $(cat /etc/hostname | cut -d"-" -f 3 | grep -c '^[0-9][0-9]') -gt 0 ]]; th else machineNumber=99 fi -agentServerInfoFile="/octopus-agent/octopus-agent.conf " +agentServerInfoFile="/octopus-agent/octopus-agent.conf" #cat >/etc/environment.d/octopus-agent.conf <"$agentServerInfoFile"<