[ server ] [ status ]- update some code - 1

This commit is contained in:
zeaslity
2023-02-03 11:30:20 +08:00
parent 0bd0c9da53
commit fd5f2607b9
12 changed files with 362 additions and 223 deletions

View File

@@ -188,20 +188,20 @@ jobs:
push: true push: true
tags: ${{ steps.docker_tags_server.outputs.tags }} tags: ${{ steps.docker_tags_server.outputs.tags }}
- name: Build and push Docker images - [ Agent ] # - name: Build and push Docker images - [ Agent ]
id: docker_build_agent # id: docker_build_agent
# You may pin to the exact commit or the version. # # You may pin to the exact commit or the version.
# uses: docker/build-push-action@c56af957549030174b10d6867f20e78cfd7debc5 # # uses: docker/build-push-action@c56af957549030174b10d6867f20e78cfd7debc5
uses: docker/build-push-action@v3.2.0 # uses: docker/build-push-action@v3.2.0
with: # with:
context: ./ # context: ./
# Path to the Dockerfile # # Path to the Dockerfile
file: ./agent/Dockerfile # file: ./agent/Dockerfile
# List of target platforms for build # # List of target platforms for build
platforms: linux/amd64,linux/arm64 # platforms: linux/amd64,linux/arm64
# Always attempt to pull all referenced images # # Always attempt to pull all referenced images
pull: false # pull: false
# Push is a shorthand for --output=type=registry # # Push is a shorthand for --output=type=registry
push: true # push: true
tags: ${{ steps.docker_tags_agent.outputs.tags }} # tags: ${{ steps.docker_tags_agent.outputs.tags }}

View File

@@ -18,11 +18,9 @@ public enum AgentHealthyStatusEnum {
String status; String status;
String description; String description;
AgentHealthyStatusEnum(String status, String description) { AgentHealthyStatusEnum(String status, String description) {
this.description = description; this.description = description;
this.status = status; this.status = status;

View File

@@ -12,7 +12,7 @@ import lombok.experimental.SuperBuilder;
public class OctopusStatusMessage { public class OctopusStatusMessage {
// below two will be used by both server and agent // below two will be used by both server and agent
public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_STATUS"; public static final String ALL_AGENT_STATUS_REDIS_KEY = "ALL_AGENT_HEALTHY_STATUS";
public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping"; public static final String HEALTHY_STATUS_MESSAGE_TYPE = "ping";
public static final String ALL_STATUS_MESSAGE_TYPE = "all"; public static final String ALL_STATUS_MESSAGE_TYPE = "all";
public static final String METRIC_STATUS_MESSAGE_TYPE = "metric"; public static final String METRIC_STATUS_MESSAGE_TYPE = "metric";

View File

@@ -15,8 +15,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
@RestController @RestController
@Api(value = "定时任务控制中心的Controller") @Api(value = "定时任务控制中心的Controller")
@RequestMapping(value = "/octopus/server/scheduler") @RequestMapping(value = "/octopus/server/scheduler")
@@ -38,18 +36,27 @@ public class SchedulerController {
@ApiParam(name = "scheduleScript") @RequestBody() ScriptSchedulerVO scriptSchedulerVO @ApiParam(name = "scheduleScript") @RequestBody() ScriptSchedulerVO scriptSchedulerVO
) { ) {
ALL_AGENT_TOPIC_NAME_SET.add("Chengdu-amd64-98-98066f");
HashMap<String, String> map = octopusQuartzService.createScriptScheduledMission(scriptSchedulerVO); HashMap<String, String> map = octopusQuartzService.createScriptScheduledMission(scriptSchedulerVO);
return R.ok(map); return R.ok(map);
} }
@ApiOperation(value = "查询所有的定时脚本任务")
@GetMapping(value = "/script/getAll")
public R<HashMap<String, String>> getAllScriptScheduler(
) {
return R.ok(null);
}
/** /**
* -------------------------------------------------------------- * --------------------------------------------------------------
* 普通的定时任务查询功能 * 普通的定时任务查询功能
* */ */
@ApiOperation(value = "查询所有mission") @ApiOperation(value = "查询所有mission")
@GetMapping(value = "/queryAllMission") @GetMapping(value = "/queryAllMission")
@@ -68,7 +75,6 @@ public class SchedulerController {
} }
@ApiOperation(value = "删除一个mission") @ApiOperation(value = "删除一个mission")
@PostMapping(value = "/deleteMission/") @PostMapping(value = "/deleteMission/")
public R<String> deleteMission( public R<String> deleteMission(

View File

@@ -10,8 +10,9 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.Map; import java.util.Map;
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_HEALTHY_STATUS_MAP; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_STATUS_MAP;
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.HEALTHY_STATUS_AGENT_LIST_MAP; import static io.wdd.rpc.init.ServerBootUpEnvironment.STATUS_AGENT_LIST_MAP;
@RestController @RestController
@Api("Agent运行状态Controller") @Api("Agent运行状态Controller")
@@ -23,14 +24,14 @@ public class StatusController {
@GetMapping("/agentStatus") @GetMapping("/agentStatus")
public R<Map> GetAllAgentHealthyStatus() { public R<Map> GetAllAgentHealthyStatus() {
return R.ok(ALL_AGENT_HEALTHY_STATUS_MAP); return R.ok(ALL_AGENT_STATUS_MAP);
} }
@ApiOperation("获取 状态-Agent Map") @ApiOperation("获取 状态-Agent Map")
@GetMapping("/statusAgent") @GetMapping("/statusAgent")
public R<Map> GetHealthyStatusAgentList() { public R<Map> GetHealthyStatusAgentList() {
return R.ok(HEALTHY_STATUS_AGENT_LIST_MAP); return R.ok(STATUS_AGENT_LIST_MAP);
} }
} }

View File

@@ -10,7 +10,6 @@ import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader; import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender; import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@@ -21,7 +20,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET;
@Service @Service
@Slf4j @Slf4j
@@ -71,7 +70,13 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) { public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList, List<List<String>> commandListComplete) {
return this.SendCommandToAgent(agentTopicName, type, commandList, commandListComplete, null); return this.SendCommandToAgent(
agentTopicName,
type,
commandList,
commandListComplete,
null
);
} }
@Override @Override
@@ -94,7 +99,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// 构造 Execution Command对应的消息体 // 构造 Execution Command对应的消息体
ExecutionMessage executionMessage = this ExecutionMessage executionMessage = this
.generateExecutionMessage( .generateExecutionMessage(
agentTopicName, type,
commandList, commandList,
resultKey, resultKey,
commandListComplete commandListComplete

View File

@@ -75,8 +75,10 @@ public class ExecutionResultDaemonHandler {
.keys() .keys()
.nextElement(); .nextElement();
log.info("current result key is [{}]", log.debug(
resultKey); "current result key is [{}]",
resultKey
);
CompletableFuture<ArrayList<String>> executionResultFuture = CompletableFuture<ArrayList<String>> executionResultFuture =
@@ -93,6 +95,7 @@ public class ExecutionResultDaemonHandler {
// 构造 resultKey对应的 Redis Stream Listener Container // 构造 resultKey对应的 Redis Stream Listener Container
buildStreamReader buildStreamReader
.buildStreamReader(commandReaderConfig); .buildStreamReader(commandReaderConfig);
// 获得结果 // 获得结果
ArrayList<String> s = new ArrayList<>( ArrayList<String> s = new ArrayList<>(
List.of("no no no") List.of("no no no")
@@ -116,8 +119,11 @@ public class ExecutionResultDaemonHandler {
} }
} }
) )
.get(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT, // 获取相应的结果
TimeUnit.SECONDS); .get(
MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
TimeUnit.SECONDS
);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
@@ -145,15 +151,19 @@ public class ExecutionResultDaemonHandler {
// 获取结果然后销毁Stream Listener Container // 获取结果然后销毁Stream Listener Container
CompletableFuture<Object> complete = CompletableFuture CompletableFuture<Object> complete = CompletableFuture
.anyOf(falloutTimeFuture, .anyOf(
executionResultFuture); falloutTimeFuture,
executionResultFuture
);
complete complete
.whenComplete( .whenComplete(
(result, e) -> { (result, e) -> {
log.info("execution result are => {}", log.debug(
result); "execution result are => {}",
result
);
// 持久化存储对应的结果 // 持久化存储对应的结果
ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey); ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey);
@@ -163,18 +173,24 @@ public class ExecutionResultDaemonHandler {
CollectionUtils.isEmpty((Collection) result) ? 1 : 0 CollectionUtils.isEmpty((Collection) result) ? 1 : 0
); );
executionLog.setRecordId(commandReaderConfig.getRecordId()); executionLog.setRecordId(commandReaderConfig.getRecordId());
executionLogService.save(executionLog);
// 保存操作
executionLogService.save(executionLog);
// 清除此次任务的内容 // 清除此次任务的内容
WAIT_EXECUTION_RESULT_LIST.remove(resultKey); WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
log.info(
log.info("[Execution] - whole process are complete !"); "[Execution] - command {} result are {} result code is {} ,whole process are complete !",
executionLog.getCommandList(),
executionLog.getResultContent(),
executionLog.getResultCode()
);
} }
); );
// very important // very important
// stuck the main thread , otherwise it will create a dead loop , really bad // stuck the main thread , otherwise it will create a dead loop
complete.join(); complete.join();
} }

View File

@@ -0,0 +1,196 @@
package io.wdd.rpc.init;
import io.wdd.common.beans.status.AgentHealthyStatusEnum;
import io.wdd.common.utils.TimeUtils;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
/**
* Server启动的时候需要初始化一系列的信息
* <p>
* 所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET
* <p>
* Agent状态信息的两个Map STATUS_AGENT_LIST_MAP ALL_AGENT_STATUS_MAP
*/
@Service
@Slf4j
public class ServerBootUpEnvironment {
/**
* 存储所有的AgentTopicName的缓存
*/
public static final Set<String> ALL_AGENT_TOPIC_NAME_SET = new HashSet<>();
/**
* 存储所有的AgentTopicName的缓存
*/
public static final List<String> ALL_AGENT_TOPIC_NAME_LIST = new ArrayList<>();
/**
* 存储 状态对应Agent列表的Map
* Agent的状态描述为 AgentHealthyStatusEnum
* HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
* FAILED -> ["agentTopicName-1" "agentTopicName-2"]
*/
public static final Map<String, List<String>> STATUS_AGENT_LIST_MAP = new HashMap<>();
/**
* 存储所有Agent状态的Map
* <p>
* 内容为 agentTopicName-健康状态
*/
public static final Map<String, String> ALL_AGENT_STATUS_MAP = new HashMap<>();
/**
* 保存所有健康运行的Agent Topic Name
*/
public static List<String> ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@Resource
CoreServerService coreServerService;
@Resource
RedisTemplate redisTemplate;
@PostConstruct
public void GenerateAllCache() {
//所有Agent的TopicName ALL_AGENT_TOPIC_NAME_SET
updateAllAgentTopicNameSetCache();
// Agent状态信息的两个Map
updateAgentStatusMapCache();
}
public void updateAllAgentTopicNameSetCache() {
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
Assert.notEmpty(
allAgentInfo,
"not agent registered ! skip the agent healthy status check !"
);
ALL_AGENT_TOPIC_NAME_LIST.clear();
ALL_AGENT_TOPIC_NAME_SET.clear();
List<String> collect = allAgentInfo
.stream()
.map(ServerInfoVO::getTopicName)
.collect(Collectors.toList());
ALL_AGENT_TOPIC_NAME_LIST.addAll(collect);
ALL_AGENT_TOPIC_NAME_SET.addAll(collect);
}
public void updateAgentStatusMapCache() {
List statusList = redisTemplate
.opsForHash()
.multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPIC_NAME_LIST
);
// current log to console is ok
// 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown)
HashMap<String, String> agentStatusMap = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) {
agentStatusMap.put(
ALL_AGENT_TOPIC_NAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i)))
);
}
String currentTimeString = TimeUtils.currentTimeString();
log.info(
"[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
currentTimeString,
agentStatusMap
);
// 2023-01-16
ALL_AGENT_STATUS_MAP.clear();
ALL_AGENT_STATUS_MAP.putAll(agentStatusMap);
// 2023-01-16
// 更新 状态-Agent容器 内容为
// HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
// FAILED -> ["agentTopicName-1" "agentTopicName-2"]
Map<String, List<String>> statusAgentListMap = agentStatusMap
.entrySet()
.stream()
.collect(
Collectors.groupingBy(
Map.Entry::getValue
)
)
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry
.getValue()
.stream()
.map(
Map.Entry::getKey
)
.collect(Collectors.toList())
)
);
// 2023-2-3 bug fix
STATUS_AGENT_LIST_MAP.clear();
STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap);
log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了");
// Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) {
if (statusList
.get(i)
.equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i));
}
}
// 缓存相应的存活Agent
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.clear();
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.addAll(allHealthyAgentTopicNames);
// help gc
agentStatusMap = null;
statusAgentListMap = null;
allHealthyAgentTopicNames = null;
}
private String uniformHealthyStatus(String agentStatus) {
switch (agentStatus) {
case "0":
return AgentHealthyStatusEnum.FAILED.getStatus();
case "1":
return AgentHealthyStatusEnum.HEALTHY.getStatus();
default:
return AgentHealthyStatusEnum.UNKNOWN.getStatus();
}
}
}

View File

@@ -11,6 +11,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
/** /**
* 收集OctopusAgent的运行Metric信息 * 收集OctopusAgent的运行Metric信息
@@ -21,8 +22,6 @@ import static io.wdd.common.beans.status.OctopusStatusMessage.METRIC_STATUS_MESS
@Slf4j @Slf4j
public class AgentRuntimeMetricStatus { public class AgentRuntimeMetricStatus {
public static List<String> ALL_HEALTHY_AGENT_TOPIC_NAMES;
public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch"; public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch";
public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount"; public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount";
@@ -32,29 +31,35 @@ public class AgentRuntimeMetricStatus {
public void collect(int metricRepeatCount, int metricRepeatPinch) { public void collect(int metricRepeatCount, int metricRepeatPinch) {
// 检查基础信息 // 检查基础信息
if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAMES)) { if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) {
log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES"); log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES");
} }
// 构建 OctopusMessage // 构建 OctopusMessage
// 只发送一次消息让Agent循环定时执行任务 // 只发送一次消息让Agent循环定时执行任务
buildMetricStatusMessageAndSend(metricRepeatCount, metricRepeatPinch); buildMetricStatusMessageAndSend(
metricRepeatCount,
metricRepeatPinch
);
// //
} }
private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) { private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) {
List<OctopusStatusMessage> collect = ALL_HEALTHY_AGENT_TOPIC_NAMES.stream() List<OctopusStatusMessage> collect = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
.stream()
.map( .map(
agentTopicName -> { agentTopicName -> {
return OctopusStatusMessage.builder() return OctopusStatusMessage
.builder()
.type(METRIC_STATUS_MESSAGE_TYPE) .type(METRIC_STATUS_MESSAGE_TYPE)
.metricRepeatCount(metricRepeatCount) .metricRepeatCount(metricRepeatCount)
.metricRepeatPinch(metricRepeatPinch) .metricRepeatPinch(metricRepeatPinch)
.agentTopicName(agentTopicName) .agentTopicName(agentTopicName)
.build(); .build();
} }
).collect(Collectors.toList()); )
.collect(Collectors.toList());
// send to the next level // send to the next level
collectAgentStatus.statusMessageToAgent(collect); collectAgentStatus.statusMessageToAgent(collect);

View File

@@ -3,22 +3,24 @@ package io.wdd.rpc.scheduler.service.status;
import io.wdd.common.beans.status.AgentHealthyStatusEnum; import io.wdd.common.beans.status.AgentHealthyStatusEnum;
import io.wdd.common.beans.status.OctopusStatusMessage; import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.init.ServerBootUpEnvironment;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.coreService.CoreServerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.CollectionUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.*; import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; import static io.wdd.common.beans.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC_NAMES; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.ServerBootUpEnvironment.STATUS_AGENT_LIST_MAP;
/** /**
* 更新频率被类 BuildStatusScheduleTask.class控制 * 更新频率被类 BuildStatusScheduleTask.class控制
@@ -36,58 +38,27 @@ import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.ALL_H
*/ */
@Service @Service
@Slf4j @Slf4j
@Lazy
public class MonitorAllAgentStatus { public class MonitorAllAgentStatus {
/**
* 存储 状态对应Agent列表的Map
* Agent的状态描述为 AgentHealthyStatusEnum
* HEALTHY -> ["agentTopicName-1" "agentTopicName-2"]
* FAILED -> ["agentTopicName-1" "agentTopicName-2"]
*/
public static final Map<String, List<String>> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>();
/**
* 存储所有Agent状态的Map
* <p>
* 内容为 agentTopicName-健康状态
*/
public static final Map<String, String> ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>();
/**
* 存储所有的AgentTopicName的缓存
*/
public static final Set<String> ALL_AGENT_TOPIC_NAME_SET = new HashSet<>();
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource @Resource
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource @Resource
CollectAgentStatus collectAgentStatus; CollectAgentStatus collectAgentStatus;
@Resource @Resource
CoreServerService coreServerService; ServerBootUpEnvironment serverBootUpEnvironment;
@Resource @Resource
BuildStatusScheduleTask buildStatusScheduleTask; BuildStatusScheduleTask buildStatusScheduleTask;
private List<String> ALL_AGENT_TOPIC_NAME_LIST;
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP; private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
public void go() { public void go() {
try { try {
// 1. 获取所有注册的Agent 手动更新
// 1. 获取所有注册的Agent
// todo need to cache this
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
Assert.notEmpty(
allAgentInfo,
"not agent registered ! skip the agent healthy status check !"
);
ALL_AGENT_TOPIC_NAME_LIST = allAgentInfo
.stream()
.map(ServerInfoVO::getTopicName)
.collect(Collectors.toList());
// 2023-01-16
ALL_AGENT_TOPIC_NAME_SET.clear();
ALL_AGENT_TOPIC_NAME_SET.addAll(ALL_AGENT_TOPIC_NAME_LIST);
// 1.1 检查 Agent状态保存数据结构是否正常 // 1.1 检查 Agent状态保存数据结构是否正常
checkOrCreateRedisHealthyKey(); checkOrCreateRedisHealthyKey();
@@ -108,9 +79,8 @@ public class MonitorAllAgentStatus {
private void checkOrCreateRedisHealthyKey() { private void checkOrCreateRedisHealthyKey() {
// must init the cached map && make sure the redis key existed! // 检查开始的时候 需要手动将所有Agent的状态置为0
if (null == AGENT_HEALTHY_INIT_MAP || !redisTemplate.hasKey(ALL_AGENT_STATUS_REDIS_KEY)) { // Agent如果存活,那么就可以将其自身状态修改为1
log.info("ALL_AGENT_STATUS_REDIS_KEY not existed , start to create");
// build the redis all agent healthy map struct // build the redis all agent healthy map struct
HashMap<String, String> initMap = new HashMap<>(32); HashMap<String, String> initMap = new HashMap<>(32);
@@ -140,7 +110,7 @@ public class MonitorAllAgentStatus {
ALL_AGENT_STATUS_REDIS_KEY, ALL_AGENT_STATUS_REDIS_KEY,
initMap initMap
); );
}
} }
private void buildAndSendAgentHealthMessage() { private void buildAndSendAgentHealthMessage() {
@@ -161,94 +131,14 @@ public class MonitorAllAgentStatus {
private void updateAllAgentHealthyStatus() { private void updateAllAgentHealthyStatus() {
List statusList = redisTemplate
.opsForHash()
.multiGet(
ALL_AGENT_STATUS_REDIS_KEY,
ALL_AGENT_TOPIC_NAME_LIST
);
// current log to console is ok
// 结构保存为agentStatusMap ==> agent-topic-name : STATUS(healthy, failed, unknown)
HashMap<String, String> agentStatusMap = new HashMap<>(32);
for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) {
agentStatusMap.put(
ALL_AGENT_TOPIC_NAME_LIST.get(i),
uniformHealthyStatus(String.valueOf(statusList.get(i)))
);
}
String currentTimeString = TimeUtils.currentTimeString(); String currentTimeString = TimeUtils.currentTimeString();
log.info(
"[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
currentTimeString,
agentStatusMap
);
// 2023-01-16 // 更新所有的缓存状态
ALL_AGENT_HEALTHY_STATUS_MAP.clear(); serverBootUpEnvironment.updateAgentStatusMapCache();
ALL_AGENT_HEALTHY_STATUS_MAP.putAll(agentStatusMap);
// 2023-01-16 // 执行Metric上报定时任务
Map<String, List<String>> statusAgentListMap = agentStatusMap
.entrySet()
.stream()
.collect(
Collectors.groupingBy(
Map.Entry::getValue
)
)
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
entry -> entry
.getValue()
.stream()
.map(
Map.Entry::getKey
)
.collect(Collectors.toList())
)
);
HEALTHY_STATUS_AGENT_LIST_MAP.putAll(statusAgentListMap);
log.debug("Agent存活状态 状态-Agent名称-Map 已经更新了");
// help gc
agentStatusMap = null;
// Trigger调用Agent Metric 任务
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
for (int i = 0; i < statusList.size(); i++) {
if (statusList
.get(i)
.equals("1")) {
allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i));
}
}
// 缓存相应的存活Agent
ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames;
// 执行Metric上报任务
buildStatusScheduleTask.buildAgentMetricScheduleTask(); buildStatusScheduleTask.buildAgentMetricScheduleTask();
// init the healthy map
// 需要将所有的Agent的状态置为 "0"
ALL_AGENT_TOPIC_NAME_LIST
.stream()
.forEach(
agentTopicName -> {
AGENT_HEALTHY_INIT_MAP.put(
agentTopicName,
"0"
);
}
);
// update time // update time
AGENT_HEALTHY_INIT_MAP.put( AGENT_HEALTHY_INIT_MAP.put(
"updateTime", "updateTime",
@@ -261,17 +151,7 @@ public class MonitorAllAgentStatus {
ALL_AGENT_STATUS_REDIS_KEY, ALL_AGENT_STATUS_REDIS_KEY,
AGENT_HEALTHY_INIT_MAP AGENT_HEALTHY_INIT_MAP
); );
}
private String uniformHealthyStatus(String agentStatus) {
switch (agentStatus) {
case "0":
return AgentHealthyStatusEnum.FAILED.getStatus();
case "1":
return AgentHealthyStatusEnum.HEALTHY.getStatus();
default:
return AgentHealthyStatusEnum.UNKNOWN.getStatus();
}
} }

View File

@@ -8,8 +8,6 @@ import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
@SpringBootTest @SpringBootTest
class ServerApplicationTests { class ServerApplicationTests {
@@ -57,8 +55,6 @@ class ServerApplicationTests {
completeScript.add(command4); completeScript.add(command4);
ALL_AGENT_TOPIC_NAME_SET.add("Chengdu-amd64-98-98066f");
ArrayList<String> targetMachineList = new ArrayList<>( ArrayList<String> targetMachineList = new ArrayList<>(
List.of( List.of(
"Chengdu-amd64-98-98066f" "Chengdu-amd64-98-98066f"

View File

@@ -1,12 +1,48 @@
package io.wdd.server; package io.wdd.server;
import io.wdd.common.beans.status.AgentHealthyStatusEnum;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class SimpleTest { public class SimpleTest {
public static void main(String[] args) { public static void main(String[] args) {
HashMap<String, String> map = new HashMap<>();
HashMap<String, List<String>> hashMap = new HashMap<>();
hashMap.put(
"HEALTHY",
new ArrayList<>(
List.of(
"Tokyo-amd64-07-f66a41",
"Tokyo-amd64-03-dc543f"
)
)
);
hashMap
.get(AgentHealthyStatusEnum.FAILED.getStatus())
.stream()
.forEach(
agentTopicName -> {
map.put(
agentTopicName,
"0"
);
}
);
}
private void CompletableFutureTest() {
//任务1洗水壶->烧开水 //任务1洗水壶->烧开水
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try { try {