[ server ] [ execution ]- optimize some controller - 2
This commit is contained in:
@@ -5,8 +5,8 @@ import io.swagger.annotations.Api;
|
|||||||
import io.swagger.annotations.ApiOperation;
|
import io.swagger.annotations.ApiOperation;
|
||||||
import io.swagger.annotations.ApiParam;
|
import io.swagger.annotations.ApiParam;
|
||||||
import io.wdd.common.beans.response.R;
|
import io.wdd.common.beans.response.R;
|
||||||
import io.wdd.rpc.scheduler.config.UpdateJobBean;
|
|
||||||
import io.wdd.rpc.scheduler.service.QuartzSchedulerService;
|
import io.wdd.rpc.scheduler.service.QuartzSchedulerService;
|
||||||
|
import org.quartz.Trigger;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
@@ -38,7 +38,7 @@ public class SchedulerController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation(value = "删除一个job")
|
@ApiOperation(value = "删除一个job")
|
||||||
@PostMapping(value = "/deleteJob/{jobName}")
|
@PostMapping(value = "/deleteJob/")
|
||||||
public R<String> deleteJob(
|
public R<String> deleteJob(
|
||||||
@ApiParam(name = "jobName") @RequestParam("jobName") String jobName
|
@ApiParam(name = "jobName") @RequestParam("jobName") String jobName
|
||||||
) {
|
) {
|
||||||
@@ -64,13 +64,25 @@ public class SchedulerController {
|
|||||||
@ApiOperation(value = "修改job的cron时间")
|
@ApiOperation(value = "修改job的cron时间")
|
||||||
@PostMapping(value = "/updateJob/{jobName}")
|
@PostMapping(value = "/updateJob/{jobName}")
|
||||||
public void deleteJob(
|
public void deleteJob(
|
||||||
@ApiParam(name = "jobName") @PathVariable("jobName") String jobName, @ApiParam(name = "jobCronTime") @RequestBody UpdateJobBean updateJobBean
|
@ApiParam(name = "jobName") @RequestParam("jobName") String jobName,
|
||||||
|
@ApiParam(name = "jobCronTime") @RequestParam("jobCronTime") String jobCronTime
|
||||||
) {
|
) {
|
||||||
|
|
||||||
octopusQuartzService.updateJob(
|
octopusQuartzService.updateJob(
|
||||||
jobName,
|
jobName,
|
||||||
jobName,
|
jobName,
|
||||||
updateJobBean.getJobCronTime()
|
jobCronTime
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ApiOperation(value = "查询所有的触发器Trigger")
|
||||||
|
@GetMapping(value = "/allTriggers")
|
||||||
|
public R<List<Trigger>> queryAllTriggers() {
|
||||||
|
|
||||||
|
return R.ok(
|
||||||
|
octopusQuartzService.queryAllTrigger()
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
|
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
|
||||||
import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_LIST;
|
import static io.wdd.rpc.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -54,7 +54,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
|||||||
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
|
public String SendCommandToAgent(String agentTopicName, String type, List<String> commandList) {
|
||||||
|
|
||||||
// 检查agentTopicName是否存在
|
// 检查agentTopicName是否存在
|
||||||
if (!ALL_AGENT_TOPIC_NAME_LIST.contains(agentTopicName)) {
|
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
|
||||||
log.error("agentTopicName异常!");
|
log.error("agentTopicName异常!");
|
||||||
throw new MyRuntimeException("agentTopicName异常!");
|
throw new MyRuntimeException("agentTopicName异常!");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,24 +0,0 @@
|
|||||||
package io.wdd.rpc.scheduler.config;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
||||||
import io.swagger.annotations.ApiModel;
|
|
||||||
import io.swagger.annotations.ApiModelProperty;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Andya
|
|
||||||
* @create 2021/04/01
|
|
||||||
*/
|
|
||||||
@ApiModel(value = "更新job cron时间参数")
|
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
|
||||||
public class UpdateJobBean {
|
|
||||||
@ApiModelProperty(value = "jobTime的cron表达式", example = "0 0 1 * * ?")
|
|
||||||
String jobCronTime;
|
|
||||||
|
|
||||||
public String getJobCronTime() {
|
|
||||||
return jobCronTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setJobCronTime(String jobCronTime) {
|
|
||||||
this.jobCronTime = jobCronTime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -16,15 +16,19 @@ public class AgentRunMetricStatusJob extends QuartzJobBean {
|
|||||||
@Resource
|
@Resource
|
||||||
AgentRuntimeMetricStatus agentRuntimeMetricStatus;
|
AgentRuntimeMetricStatus agentRuntimeMetricStatus;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
||||||
|
|
||||||
// 从JobDetailContext中获取相应的信息
|
// 从JobDetailContext中获取相应的信息
|
||||||
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
|
JobDataMap jobDataMap = jobExecutionContext
|
||||||
|
.getJobDetail()
|
||||||
|
.getJobDataMap();
|
||||||
|
|
||||||
// 执行Agent Metric 状态收集任务
|
// 执行Agent Metric 状态收集任务
|
||||||
agentRuntimeMetricStatus.collect((Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH));
|
agentRuntimeMetricStatus.collect(
|
||||||
|
(Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT),
|
||||||
|
(Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH)
|
||||||
|
);
|
||||||
|
|
||||||
// todo 机构设计状态会被存储至 Redis Stream Key 中
|
// todo 机构设计状态会被存储至 Redis Stream Key 中
|
||||||
// AgentTopicName-Metric
|
// AgentTopicName-Metric
|
||||||
|
|||||||
@@ -30,6 +30,9 @@ public class BuildStatusScheduleTask {
|
|||||||
@Value(value = "${octopus.status.healthy.start-delay}")
|
@Value(value = "${octopus.status.healthy.start-delay}")
|
||||||
int healthyCheckStartDelaySeconds;
|
int healthyCheckStartDelaySeconds;
|
||||||
|
|
||||||
|
@Value(value = "${octopus.status.metric.cron}")
|
||||||
|
int metricReportCronExpress;
|
||||||
|
|
||||||
@Value(value = "${octopus.status.metric.pinch}")
|
@Value(value = "${octopus.status.metric.pinch}")
|
||||||
int metricReportTimePinch;
|
int metricReportTimePinch;
|
||||||
|
|
||||||
@@ -64,8 +67,8 @@ public class BuildStatusScheduleTask {
|
|||||||
long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000;
|
long totalSeconds = (nextValidTime.getTime() - now.getTime()) / 1000;
|
||||||
metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1;
|
metricReportTimesCount = (int) (totalSeconds / metricReportTimePinch) - 1;
|
||||||
|
|
||||||
System.out.println("totalSeconds = " + totalSeconds);
|
/*System.out.println("totalSeconds = " + totalSeconds);
|
||||||
System.out.println("metricReportTimesCount = " + metricReportTimesCount);
|
System.out.println("metricReportTimesCount = " + metricReportTimesCount);*/
|
||||||
|
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|||||||
@@ -22,8 +22,8 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 更新频率被类 BuildStatusScheduleTask.class控制
|
* 更新频率被类 BuildStatusScheduleTask.class控制
|
||||||
*
|
* <p>
|
||||||
*
|
* <p>
|
||||||
* 获取所有注册的Agent
|
* 获取所有注册的Agent
|
||||||
* <p>
|
* <p>
|
||||||
* 发送状态检查信息, agent需要update相应的HashMap的值
|
* 发送状态检查信息, agent需要update相应的HashMap的值
|
||||||
@@ -38,7 +38,6 @@ import static io.wdd.rpc.status.AgentRuntimeMetricStatus.ALL_HEALTHY_AGENT_TOPIC
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class MonitorAllAgentStatus {
|
public class MonitorAllAgentStatus {
|
||||||
|
|
||||||
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
|
|
||||||
/**
|
/**
|
||||||
* 存储 状态对应Agent列表的Map
|
* 存储 状态对应Agent列表的Map
|
||||||
* Agent的状态描述为 AgentHealthyStatusEnum
|
* Agent的状态描述为 AgentHealthyStatusEnum
|
||||||
@@ -46,18 +45,17 @@ public class MonitorAllAgentStatus {
|
|||||||
* FAILED -> ["agentTopicName-1", "agentTopicName-2"]
|
* FAILED -> ["agentTopicName-1", "agentTopicName-2"]
|
||||||
*/
|
*/
|
||||||
public static final Map<String, List<String>> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>();
|
public static final Map<String, List<String>> HEALTHY_STATUS_AGENT_LIST_MAP = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 存储所有Agent状态的Map
|
* 存储所有Agent状态的Map
|
||||||
*
|
* <p>
|
||||||
* 内容为 agentTopicName-健康状态
|
* 内容为 agentTopicName-健康状态
|
||||||
*/
|
*/
|
||||||
public static final Map<String, String> ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>();
|
public static final Map<String, String> ALL_AGENT_HEALTHY_STATUS_MAP = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 存储所有的AgentTopicName的缓存
|
* 存储所有的AgentTopicName的缓存
|
||||||
*/
|
*/
|
||||||
public static final Set<String> ALL_AGENT_TOPIC_NAME_LIST = new HashSet<>();
|
public static final Set<String> ALL_AGENT_TOPIC_NAME_SET = new HashSet<>();
|
||||||
|
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
|
||||||
@Resource
|
@Resource
|
||||||
RedisTemplate redisTemplate;
|
RedisTemplate redisTemplate;
|
||||||
@Resource
|
@Resource
|
||||||
@@ -66,6 +64,7 @@ public class MonitorAllAgentStatus {
|
|||||||
CoreServerService coreServerService;
|
CoreServerService coreServerService;
|
||||||
@Resource
|
@Resource
|
||||||
BuildStatusScheduleTask buildStatusScheduleTask;
|
BuildStatusScheduleTask buildStatusScheduleTask;
|
||||||
|
private List<String> ALL_AGENT_TOPIC_NAME_LIST;
|
||||||
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
|
private HashMap<String, String> AGENT_HEALTHY_INIT_MAP;
|
||||||
|
|
||||||
public void go() {
|
public void go() {
|
||||||
@@ -75,14 +74,19 @@ public class MonitorAllAgentStatus {
|
|||||||
// 1. 获取所有注册的Agent
|
// 1. 获取所有注册的Agent
|
||||||
// todo need to cache this
|
// todo need to cache this
|
||||||
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
|
List<ServerInfoVO> allAgentInfo = coreServerService.serverGetAll();
|
||||||
Assert.notEmpty(allAgentInfo,
|
Assert.notEmpty(
|
||||||
"not agent registered ! skip the agent healthy status check !");
|
allAgentInfo,
|
||||||
|
"not agent registered ! skip the agent healthy status check !"
|
||||||
|
);
|
||||||
|
|
||||||
Set<String> collect = allAgentInfo.stream()
|
ALL_AGENT_TOPIC_NAME_LIST = allAgentInfo
|
||||||
|
.stream()
|
||||||
.map(ServerInfoVO::getTopicName)
|
.map(ServerInfoVO::getTopicName)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toList());
|
||||||
ALL_AGENT_TOPIC_NAME_LIST.clear();
|
|
||||||
ALL_AGENT_TOPIC_NAME_LIST.addAll(collect);
|
// 2023-01-16
|
||||||
|
ALL_AGENT_TOPIC_NAME_SET.clear();
|
||||||
|
ALL_AGENT_TOPIC_NAME_SET.addAll(ALL_AGENT_TOPIC_NAME_LIST);
|
||||||
|
|
||||||
// 1.1 检查 Agent状态保存数据结构是否正常
|
// 1.1 检查 Agent状态保存数据结构是否正常
|
||||||
checkOrCreateRedisHealthyKey();
|
checkOrCreateRedisHealthyKey();
|
||||||
@@ -110,28 +114,38 @@ public class MonitorAllAgentStatus {
|
|||||||
|
|
||||||
// build the redis all agent healthy map struct
|
// build the redis all agent healthy map struct
|
||||||
HashMap<String, String> initMap = new HashMap<>(32);
|
HashMap<String, String> initMap = new HashMap<>(32);
|
||||||
ALL_AGENT_TOPICNAME_LIST.stream()
|
ALL_AGENT_TOPIC_NAME_LIST
|
||||||
|
.stream()
|
||||||
.forEach(
|
.forEach(
|
||||||
agentTopicName -> {
|
agentTopicName -> {
|
||||||
initMap.put(agentTopicName,
|
initMap.put(
|
||||||
"0");
|
agentTopicName,
|
||||||
|
"0"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
initMap.put("updateTime",
|
|
||||||
TimeUtils.currentTimeString());
|
initMap.put(
|
||||||
|
"updateTime",
|
||||||
|
TimeUtils.currentTimeString()
|
||||||
|
);
|
||||||
|
|
||||||
// cache this map struct
|
// cache this map struct
|
||||||
AGENT_HEALTHY_INIT_MAP = initMap;
|
AGENT_HEALTHY_INIT_MAP = initMap;
|
||||||
|
|
||||||
redisTemplate.opsForHash()
|
// create the healthy redis structure
|
||||||
.putAll(ALL_AGENT_STATUS_REDIS_KEY,
|
redisTemplate
|
||||||
initMap);
|
.opsForHash()
|
||||||
|
.putAll(
|
||||||
|
ALL_AGENT_STATUS_REDIS_KEY,
|
||||||
|
initMap
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void buildAndSendAgentHealthMessage() {
|
private void buildAndSendAgentHealthMessage() {
|
||||||
|
|
||||||
List<OctopusStatusMessage> collect = ALL_AGENT_TOPICNAME_LIST
|
List<OctopusStatusMessage> collect = ALL_AGENT_TOPIC_NAME_LIST
|
||||||
.stream()
|
.stream()
|
||||||
.map(
|
.map(
|
||||||
agentTopicName -> OctopusStatusMessage
|
agentTopicName -> OctopusStatusMessage
|
||||||
@@ -146,26 +160,29 @@ public class MonitorAllAgentStatus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void updateAllAgentHealthyStatus() {
|
private void updateAllAgentHealthyStatus() {
|
||||||
|
|
||||||
List statusList = redisTemplate
|
List statusList = redisTemplate
|
||||||
.opsForHash()
|
.opsForHash()
|
||||||
.multiGet(
|
.multiGet(
|
||||||
ALL_AGENT_STATUS_REDIS_KEY,
|
ALL_AGENT_STATUS_REDIS_KEY,
|
||||||
ALL_AGENT_TOPICNAME_LIST);
|
ALL_AGENT_TOPIC_NAME_LIST
|
||||||
|
);
|
||||||
|
|
||||||
// current log to console is ok
|
// current log to console is ok
|
||||||
// agent-topic-name : STATUS(healthy, failed, unknown)
|
// agent-topic-name : STATUS(healthy, failed, unknown)
|
||||||
HashMap<String, String> agentStatusMap = new HashMap<>(32);
|
HashMap<String, String> agentStatusMap = new HashMap<>(32);
|
||||||
for (int i = 0; i < ALL_AGENT_TOPICNAME_LIST.size(); i++) {
|
for (int i = 0; i < ALL_AGENT_TOPIC_NAME_LIST.size(); i++) {
|
||||||
agentStatusMap.put(
|
agentStatusMap.put(
|
||||||
ALL_AGENT_TOPICNAME_LIST.get(i),
|
ALL_AGENT_TOPIC_NAME_LIST.get(i),
|
||||||
uniformHealthyStatus(String.valueOf(statusList.get(i)))
|
uniformHealthyStatus(String.valueOf(statusList.get(i)))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
String currentTimeString = TimeUtils.currentTimeString();
|
String currentTimeString = TimeUtils.currentTimeString();
|
||||||
log.info("[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
|
log.info(
|
||||||
|
"[ AGENT HEALTHY CHECK ] time is {} , result are => {}",
|
||||||
currentTimeString,
|
currentTimeString,
|
||||||
agentStatusMap);
|
agentStatusMap
|
||||||
|
);
|
||||||
|
|
||||||
// 2023-01-16
|
// 2023-01-16
|
||||||
ALL_AGENT_HEALTHY_STATUS_MAP.clear();
|
ALL_AGENT_HEALTHY_STATUS_MAP.clear();
|
||||||
@@ -204,9 +221,10 @@ public class MonitorAllAgentStatus {
|
|||||||
// Trigger调用Agent Metric 任务
|
// Trigger调用Agent Metric 任务
|
||||||
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
|
ArrayList<String> allHealthyAgentTopicNames = new ArrayList<>(32);
|
||||||
for (int i = 0; i < statusList.size(); i++) {
|
for (int i = 0; i < statusList.size(); i++) {
|
||||||
if (statusList.get(i)
|
if (statusList
|
||||||
|
.get(i)
|
||||||
.equals("1")) {
|
.equals("1")) {
|
||||||
allHealthyAgentTopicNames.add(ALL_AGENT_TOPICNAME_LIST.get(i));
|
allHealthyAgentTopicNames.add(ALL_AGENT_TOPIC_NAME_LIST.get(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames;
|
ALL_HEALTHY_AGENT_TOPIC_NAMES = allHealthyAgentTopicNames;
|
||||||
@@ -214,12 +232,18 @@ public class MonitorAllAgentStatus {
|
|||||||
buildStatusScheduleTask.buildAgentMetricScheduleTask();
|
buildStatusScheduleTask.buildAgentMetricScheduleTask();
|
||||||
|
|
||||||
// update time
|
// update time
|
||||||
AGENT_HEALTHY_INIT_MAP.put("updateTime",
|
AGENT_HEALTHY_INIT_MAP.put(
|
||||||
currentTimeString);
|
"updateTime",
|
||||||
|
currentTimeString
|
||||||
|
);
|
||||||
|
|
||||||
// init the healthy map
|
// init the healthy map
|
||||||
redisTemplate.opsForHash()
|
redisTemplate
|
||||||
.putAll(ALL_AGENT_STATUS_REDIS_KEY,
|
.opsForHash()
|
||||||
AGENT_HEALTHY_INIT_MAP);
|
.putAll(
|
||||||
|
ALL_AGENT_STATUS_REDIS_KEY,
|
||||||
|
AGENT_HEALTHY_INIT_MAP
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String uniformHealthyStatus(String agentStatus) {
|
private String uniformHealthyStatus(String agentStatus) {
|
||||||
|
|||||||
Reference in New Issue
Block a user