[ Status ] optimize the code

This commit is contained in:
IceDerce
2023-06-15 17:03:22 +08:00
parent 4b3f7be1dd
commit 54569cc41b
17 changed files with 123 additions and 157 deletions

View File

@@ -5,7 +5,7 @@ import "testing"
func TestReadTimeOutput(t *testing.T) {
strings := []string{
"/bin/bash",
"/root/simple.sh",
"/root/IdeaProjects/ProjectOctopus/agent-go/tmp/simple.sh",
}
ReadTimeCommandExecutor(strings)

View File

@@ -9,7 +9,6 @@ require (
github.com/spf13/viper v1.15.0
github.com/streadway/amqp v1.0.0
go.uber.org/zap v1.24.0
gopkg.in/yaml.v3 v3.0.1
)
require (
@@ -58,4 +57,5 @@ require (
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -118,12 +118,18 @@ func executorOMHandler(octopusMessage *OctopusMessage) {
func statusOMHandler(octopusMessage *OctopusMessage) {
v, ok := (octopusMessage.Content).(string)
if !ok {
log.ErrorF("convert to string is wrong %s", v)
}
statusMsgString := octopusMessage.Content.(string)
var statusMessage *status.StatusMessage
err := json.Unmarshal([]byte(statusMsgString), &statusMessage)
if err != nil {
log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", statusMsgString))
fmt.Println(err.Error())
log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage))
return
}

View File

@@ -46,4 +46,5 @@ func GetCPUStatus() (*CPUStatus, error) {
CPULoads: cpuLoads,
SystemLoads: systemLoads,
}, nil
}

7
agent-go/tmp/1.sh Normal file
View File

@@ -0,0 +1,7 @@
#!/bin/bash
export http_proxy=http://10.250.0.10:10810 && export https_proxy=http://10.250.0.10:10810

7
agent-go/tmp/simple.sh Executable file
View File

@@ -0,0 +1,7 @@
#!/bin/bash
for i in {1..30}
do
echo "yes"
sleep 0.3
done

View File

@@ -30,7 +30,15 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
@Override
public boolean handle(OctopusMessage octopusMessage) {
if (!octopusMessage
.getType()
.equals(OctopusMessageType.EXECUTOR)) {
return next.handle(octopusMessage);
@@ -41,6 +49,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
try {
// 需要首先解析成 ExecutionMessage
ExecutionMessage executionMessage = objectMapper.readValue(
(String) octopusMessage.getContent(),
new TypeReference<ExecutionMessage>() {

View File

@@ -0,0 +1,5 @@
package io.wdd.agent;
public class CommandTest {
}

View File

@@ -4,7 +4,8 @@ package io.wdd.rpc.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.wdd.common.response.R;
import io.wdd.rpc.init.AgentStatusCacheService;
import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService;
import io.wdd.rpc.status.service.AsyncStatusService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -23,11 +24,14 @@ import static io.wdd.rpc.init.AgentStatusCacheService.*;
public class StatusController {
@Resource
AgentStatusCacheService agentStatusCacheService;
AsyncStatusService asyncStatusService;
@Resource
AgentAliveStatusMonitorService agentAliveStatusMonitorService;
@ApiOperation("[ Agent-状态 ] Map")
@GetMapping("/agent/status")
public R<Map<String, String>> GetAllAgentHealthyStatus() {
public R<Map<String, Boolean>> GetAllAgentHealthyStatus() {
return R.ok(ALL_AGENT_STATUS_MAP);
}
@@ -76,7 +80,9 @@ public class StatusController {
public R<Map<String, List<String>>> ManualUpdateAgentStatus() {
// 手动调用更新
agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap);
Map<String, Boolean> agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus(ALL_AGENT_TOPIC_NAME_LIST, 5);
agentAliveStatusMonitorService.updateAllAgentHealthyStatus(agentAliveStatusMap);
return R.ok(STATUS_AGENT_LIST_MAP);
}

View File

@@ -205,15 +205,15 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
}
// 构造回复信息的内容
OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = OctopusMessageAsyncReplayContend.build(
OctopusMessageAsyncReplayContend executionReplayContent = OctopusMessageAsyncReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
);
CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch();
CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch();
// 开始等待结果
asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend);
asyncWaitOctopusMessageResultService.waitFor(executionReplayContent);
// 监听结果
try {
@@ -228,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend);
asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent);
// 解析结果
OctopusMessageAsyncReplayContend
executionReplayContent
.getReplayOMList()
.stream()
.map(

View File

@@ -1,6 +1,6 @@
package io.wdd.rpc.scheduler.job;
import io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus;
import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -8,13 +8,13 @@ import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH;
public class AgentRunMetricStatusJob extends QuartzJobBean {
@Resource
AgentRuntimeMetricStatus agentRuntimeMetricStatus;
AgentMetricStatusCollectService agentMetricStatusCollectService;
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
@@ -25,7 +25,7 @@ public class AgentRunMetricStatusJob extends QuartzJobBean {
.getJobDataMap();
// 执行Agent Metric 状态收集任务
agentRuntimeMetricStatus.collect(
agentMetricStatusCollectService.collect(
(Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT),
(Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH)
);

View File

@@ -14,8 +14,8 @@ import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT;
import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH;
@Component
@Slf4j

View File

@@ -3,7 +3,6 @@ package io.wdd.rpc.scheduler.service.status;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.init.AgentStatusCacheService;
import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask;
import io.wdd.rpc.status.OctopusStatusMessage;
import io.wdd.rpc.status.service.AsyncStatusService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@@ -13,13 +12,10 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE;
/**
* 更新频率被类 BuildStatusScheduleTask.class控制
@@ -43,8 +39,6 @@ public class AgentAliveStatusMonitorService {
private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5;
@Resource
RedisTemplate redisTemplate;
@Resource
CollectAgentStatus collectAgentStatus;
@Resource
AgentStatusCacheService agentStatusCacheService;
@@ -119,23 +113,7 @@ public class AgentAliveStatusMonitorService {
}
private void buildAndSendAgentHealthMessage() {
List<OctopusStatusMessage> collect = ALL_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> OctopusStatusMessage
.builder()
.statusType(HEALTHY_STATUS_MESSAGE_TYPE)
.build()
)
.collect(Collectors.toList());
// 发送信息
collectAgentStatus.statusMessageToAgent(collect);
}
private void updateAllAgentHealthyStatus(Map<String, Boolean> agentAliveStatusMap) {
public void updateAllAgentHealthyStatus(Map<String, Boolean> agentAliveStatusMap) {
String currentTimeString = TimeUtils.currentTimeString();

View File

@@ -1,12 +1,17 @@
package io.wdd.rpc.scheduler.service.status;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.OctopusStatusMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
@@ -20,13 +25,20 @@ import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE;
*/
@Service
@Slf4j
public class AgentRuntimeMetricStatus {
public class AgentMetricStatusCollectService {
public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch";
public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount";
@Resource
CollectAgentStatus collectAgentStatus;
OctopusStatusMessage octopusStatusMessage;
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
public void collect(int metricRepeatCount, int metricRepeatPinch) {
@@ -34,35 +46,32 @@ public class AgentRuntimeMetricStatus {
if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) {
log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES");
}
// 构建 OctopusMessage
// 只发送一次消息让Agent循环定时执行任务
buildMetricStatusMessageAndSend(
metricRepeatCount,
metricRepeatPinch
);
//
}
private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) {
List<OctopusStatusMessage> collect = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
LocalDateTime currentTime = TimeUtils.currentFormatTime();
List<OctopusMessage> octopusStatusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> {
return OctopusStatusMessage
.builder()
.statusType(METRIC_STATUS_MESSAGE_TYPE)
.metricRepeatCount(metricRepeatCount)
.metricRepeatPinch(metricRepeatPinch)
.agentTopicName(agentTopicName)
.build();
}
agentTopicName -> octopusStatusMessage
.ConstructAgentStatusMessage(
METRIC_STATUS_MESSAGE_TYPE,
agentTopicName,
currentTime
)
)
.collect(Collectors.toList());
// send to the next level
collectAgentStatus.statusMessageToAgent(collect);
// batch send all messages to RabbitMQ
oMessageToAgentSender.send(octopusStatusMessageList);
}

View File

@@ -1,72 +0,0 @@
package io.wdd.rpc.scheduler.service.status;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.OctopusStatusMessage;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 1. 定时任务
* 2. 向RabbitMQ中发送消息STATUS类型的消息
* 3. 然后开始监听相应的Result StreamKey
*/
@Service
public class CollectAgentStatus {
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
public void collectAgentStatus(OctopusStatusMessage statusMessage) {
this.statusMessageToAgent(List.of(statusMessage));
}
public void statusMessageToAgent(List<OctopusStatusMessage> statusMessageList) {
// build all the OctopusMessage
List<OctopusMessage> octopusMessageList = statusMessageList.stream().map(
statusMessage -> {
OctopusMessage octopusMessage = buildOctopusMessageStatus(statusMessage);
return octopusMessage;
}
).collect(Collectors.toList());
// batch send all messages to RabbitMQ
oMessageToAgentSender.send(octopusMessageList);
// todo how to get result ?
}
private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) {
// must be like this or it will be deserialized as LinkedHashMap
String s;
try {
s = objectMapper.writeValueAsString(octopusStatusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage.builder()
.uuid(octopusStatusMessage.getAgentTopicName())
.type(OctopusMessageType.STATUS)
.init_time(TimeUtils.currentTime())
.content(s)
.build();
}
}

View File

@@ -1,10 +1,17 @@
package io.wdd.rpc.status;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@@ -19,6 +26,9 @@ public class OctopusStatusMessage {
public static final String METRIC_STATUS_MESSAGE_TYPE = "METRIC";
public static final String APP_STATUS_MESSAGE_TYPE = "APP";
@Resource
ObjectMapper objectMapper;
/**
* which kind of status should be return
* metric => short time message
@@ -31,4 +41,28 @@ public class OctopusStatusMessage {
int metricRepeatPinch;
public OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) {
OctopusStatusMessage statusMessage = OctopusStatusMessage
.builder()
.statusType(statusType)
.build();
String ops;
try {
ops = objectMapper.writeValueAsString(statusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.STATUS)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)
.build();
}
}

View File

@@ -1,7 +1,5 @@
package io.wdd.rpc.status.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
@@ -33,7 +31,7 @@ public class AsyncStatusServiceImpl implements AsyncStatusService {
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
OctopusStatusMessage octopusStatusMessage;
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@@ -107,7 +105,7 @@ public class AsyncStatusServiceImpl implements AsyncStatusService {
List<OctopusMessage> octopusStatusMessageList = ALL_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName -> ConstructAgentStatusMessage(
agentTopicName -> octopusStatusMessage.ConstructAgentStatusMessage(
HEALTHY_STATUS_MESSAGE_TYPE,
agentTopicName,
currentTime
@@ -120,27 +118,5 @@ public class AsyncStatusServiceImpl implements AsyncStatusService {
}
private OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) {
OctopusStatusMessage statusMessage = OctopusStatusMessage
.builder()
.statusType(statusType)
.build();
String ops;
try {
ops = objectMapper.writeValueAsString(statusMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(CurrentAppOctopusMessageType)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)
.build();
}
}