From 54569cc41b341ab40e6ebfdb332fc88acac3f0b1 Mon Sep 17 00:00:00 2001 From: IceDerce Date: Thu, 15 Jun 2023 17:03:22 +0800 Subject: [PATCH] [ Status ] optimize the code --- agent-go/executor/RealTimeExecutor_test.go | 2 +- agent-go/go.mod | 2 +- agent-go/rabbitmq/OctopusMessage.go | 8 ++- agent-go/status/CPU.go | 1 + agent-go/tmp/1.sh | 7 ++ agent-go/tmp/simple.sh | 7 ++ .../message/handler/OMHandlerExecutor.java | 9 +++ .../test/java/io/wdd/agent/CommandTest.java | 5 ++ .../wdd/rpc/controller/StatusController.java | 14 ++-- .../service/AsyncExecutionServiceImpl.java | 10 +-- .../job/AgentRunMetricStatusJob.java | 10 +-- .../service/BuildStatusScheduleTask.java | 4 +- .../AgentAliveStatusMonitorService.java | 24 +------ ...a => AgentMetricStatusCollectService.java} | 43 ++++++----- .../service/status/CollectAgentStatus.java | 72 ------------------- .../wdd/rpc/status/OctopusStatusMessage.java | 34 +++++++++ .../service/AsyncStatusServiceImpl.java | 28 +------- 17 files changed, 123 insertions(+), 157 deletions(-) create mode 100644 agent-go/tmp/1.sh create mode 100755 agent-go/tmp/simple.sh create mode 100644 agent/src/test/java/io/wdd/agent/CommandTest.java rename server/src/main/java/io/wdd/rpc/scheduler/service/status/{AgentRuntimeMetricStatus.java => AgentMetricStatusCollectService.java} (57%) delete mode 100644 server/src/main/java/io/wdd/rpc/scheduler/service/status/CollectAgentStatus.java diff --git a/agent-go/executor/RealTimeExecutor_test.go b/agent-go/executor/RealTimeExecutor_test.go index 715fbf6..43f44f9 100644 --- a/agent-go/executor/RealTimeExecutor_test.go +++ b/agent-go/executor/RealTimeExecutor_test.go @@ -5,7 +5,7 @@ import "testing" func TestReadTimeOutput(t *testing.T) { strings := []string{ "/bin/bash", - "/root/simple.sh", + "/root/IdeaProjects/ProjectOctopus/agent-go/tmp/simple.sh", } ReadTimeCommandExecutor(strings) diff --git a/agent-go/go.mod b/agent-go/go.mod index 8c19d58..7662bdc 100644 --- a/agent-go/go.mod +++ b/agent-go/go.mod @@ -9,7 +9,6 @@ require ( github.com/spf13/viper v1.15.0 github.com/streadway/amqp v1.0.0 go.uber.org/zap v1.24.0 - gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -58,4 +57,5 @@ require ( google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go index 1d74ab2..0b7eda7 100644 --- a/agent-go/rabbitmq/OctopusMessage.go +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -118,12 +118,18 @@ func executorOMHandler(octopusMessage *OctopusMessage) { func statusOMHandler(octopusMessage *OctopusMessage) { + v, ok := (octopusMessage.Content).(string) + if !ok { + log.ErrorF("convert to string is wrong %s", v) + } + statusMsgString := octopusMessage.Content.(string) var statusMessage *status.StatusMessage err := json.Unmarshal([]byte(statusMsgString), &statusMessage) if err != nil { - log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", statusMsgString)) + fmt.Println(err.Error()) + log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) return } diff --git a/agent-go/status/CPU.go b/agent-go/status/CPU.go index 7cc60ec..710b5e7 100644 --- a/agent-go/status/CPU.go +++ b/agent-go/status/CPU.go @@ -46,4 +46,5 @@ func GetCPUStatus() (*CPUStatus, error) { CPULoads: cpuLoads, SystemLoads: systemLoads, }, nil + } diff --git a/agent-go/tmp/1.sh b/agent-go/tmp/1.sh new file mode 100644 index 0000000..4c394b3 --- /dev/null +++ b/agent-go/tmp/1.sh @@ -0,0 +1,7 @@ +#!/bin/bash + + + +export http_proxy=http://10.250.0.10:10810 && export https_proxy=http://10.250.0.10:10810 + + diff --git a/agent-go/tmp/simple.sh b/agent-go/tmp/simple.sh new file mode 100755 index 0000000..600331d --- /dev/null +++ b/agent-go/tmp/simple.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +for i in {1..30} +do + echo "yes" + sleep 0.3 +done \ No newline at end of file diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java index b5e3702..7cead64 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerExecutor.java @@ -30,7 +30,15 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { @Override public boolean handle(OctopusMessage octopusMessage) { + + if (!octopusMessage + + + + + + .getType() .equals(OctopusMessageType.EXECUTOR)) { return next.handle(octopusMessage); @@ -41,6 +49,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler { try { // 需要首先解析成 ExecutionMessage + ExecutionMessage executionMessage = objectMapper.readValue( (String) octopusMessage.getContent(), new TypeReference() { diff --git a/agent/src/test/java/io/wdd/agent/CommandTest.java b/agent/src/test/java/io/wdd/agent/CommandTest.java new file mode 100644 index 0000000..3d797d4 --- /dev/null +++ b/agent/src/test/java/io/wdd/agent/CommandTest.java @@ -0,0 +1,5 @@ +package io.wdd.agent; + + +public class CommandTest { +} diff --git a/server/src/main/java/io/wdd/rpc/controller/StatusController.java b/server/src/main/java/io/wdd/rpc/controller/StatusController.java index 410e8bd..17f3c89 100644 --- a/server/src/main/java/io/wdd/rpc/controller/StatusController.java +++ b/server/src/main/java/io/wdd/rpc/controller/StatusController.java @@ -4,7 +4,8 @@ package io.wdd.rpc.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.wdd.common.response.R; -import io.wdd.rpc.init.AgentStatusCacheService; +import io.wdd.rpc.scheduler.service.status.AgentAliveStatusMonitorService; +import io.wdd.rpc.status.service.AsyncStatusService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -23,11 +24,14 @@ import static io.wdd.rpc.init.AgentStatusCacheService.*; public class StatusController { @Resource - AgentStatusCacheService agentStatusCacheService; + AsyncStatusService asyncStatusService; + + @Resource + AgentAliveStatusMonitorService agentAliveStatusMonitorService; @ApiOperation("[ Agent-状态 ] Map") @GetMapping("/agent/status") - public R> GetAllAgentHealthyStatus() { + public R> GetAllAgentHealthyStatus() { return R.ok(ALL_AGENT_STATUS_MAP); } @@ -76,7 +80,9 @@ public class StatusController { public R>> ManualUpdateAgentStatus() { // 手动调用更新 - agentStatusCacheService.updateAgentStatusMapCache(agentAliveStatusMap); + Map agentAliveStatusMap = asyncStatusService.AsyncCollectAgentAliveStatus(ALL_AGENT_TOPIC_NAME_LIST, 5); + + agentAliveStatusMonitorService.updateAllAgentHealthyStatus(agentAliveStatusMap); return R.ok(STATUS_AGENT_LIST_MAP); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java index 17c8018..d7b55cb 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/AsyncExecutionServiceImpl.java @@ -205,15 +205,15 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { } // 构造回复信息的内容 - OctopusMessageAsyncReplayContend OctopusMessageAsyncReplayContend = OctopusMessageAsyncReplayContend.build( + OctopusMessageAsyncReplayContend executionReplayContent = OctopusMessageAsyncReplayContend.build( commandCount, CurrentAppOctopusMessageType, initTime ); - CountDownLatch countDownLatch = OctopusMessageAsyncReplayContend.getCountDownLatch(); + CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch(); // 开始等待结果 - asyncWaitOctopusMessageResultService.waitFor(OctopusMessageAsyncReplayContend); + asyncWaitOctopusMessageResultService.waitFor(executionReplayContent); // 监听结果 try { @@ -228,10 +228,10 @@ public class AsyncExecutionServiceImpl implements AsyncExecutionService { // 等待所有的结果返回 // 停止等待结果 - asyncWaitOctopusMessageResultService.stopWaiting(OctopusMessageAsyncReplayContend); + asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent); // 解析结果 - OctopusMessageAsyncReplayContend + executionReplayContent .getReplayOMList() .stream() .map( diff --git a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java index 87445a9..e34ad2b 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/job/AgentRunMetricStatusJob.java @@ -1,6 +1,6 @@ package io.wdd.rpc.scheduler.job; -import io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus; +import io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -8,13 +8,13 @@ import org.springframework.scheduling.quartz.QuartzJobBean; import javax.annotation.Resource; -import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT; -import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH; +import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT; +import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH; public class AgentRunMetricStatusJob extends QuartzJobBean { @Resource - AgentRuntimeMetricStatus agentRuntimeMetricStatus; + AgentMetricStatusCollectService agentMetricStatusCollectService; @Override protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException { @@ -25,7 +25,7 @@ public class AgentRunMetricStatusJob extends QuartzJobBean { .getJobDataMap(); // 执行Agent Metric 状态收集任务 - agentRuntimeMetricStatus.collect( + agentMetricStatusCollectService.collect( (Integer) jobDataMap.get(METRIC_REPORT_TIMES_COUNT), (Integer) jobDataMap.get(METRIC_REPORT_TIME_PINCH) ); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java index 755e25d..009238d 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/BuildStatusScheduleTask.java @@ -14,8 +14,8 @@ import java.text.ParseException; import java.util.Date; import java.util.HashMap; -import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIMES_COUNT; -import static io.wdd.rpc.scheduler.service.status.AgentRuntimeMetricStatus.METRIC_REPORT_TIME_PINCH; +import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIMES_COUNT; +import static io.wdd.rpc.scheduler.service.status.AgentMetricStatusCollectService.METRIC_REPORT_TIME_PINCH; @Component @Slf4j diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java index d63721f..22854f4 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentAliveStatusMonitorService.java @@ -3,7 +3,6 @@ package io.wdd.rpc.scheduler.service.status; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.init.AgentStatusCacheService; import io.wdd.rpc.scheduler.service.BuildStatusScheduleTask; -import io.wdd.rpc.status.OctopusStatusMessage; import io.wdd.rpc.status.service.AsyncStatusService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -13,13 +12,10 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY; -import static io.wdd.rpc.status.OctopusStatusMessage.HEALTHY_STATUS_MESSAGE_TYPE; /** * 更新频率被类 BuildStatusScheduleTask.class控制 @@ -43,8 +39,6 @@ public class AgentAliveStatusMonitorService { private static final int MAX_WAIT_AGENT_REPORT_STATUS_TIME = 5; @Resource RedisTemplate redisTemplate; - @Resource - CollectAgentStatus collectAgentStatus; @Resource AgentStatusCacheService agentStatusCacheService; @@ -119,23 +113,7 @@ public class AgentAliveStatusMonitorService { } - private void buildAndSendAgentHealthMessage() { - - List collect = ALL_AGENT_TOPIC_NAME_LIST - .stream() - .map( - agentTopicName -> OctopusStatusMessage - .builder() - .statusType(HEALTHY_STATUS_MESSAGE_TYPE) - .build() - ) - .collect(Collectors.toList()); - - // 发送信息 - collectAgentStatus.statusMessageToAgent(collect); - } - - private void updateAllAgentHealthyStatus(Map agentAliveStatusMap) { + public void updateAllAgentHealthyStatus(Map agentAliveStatusMap) { String currentTimeString = TimeUtils.currentTimeString(); diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java similarity index 57% rename from server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java rename to server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java index 26b5a75..17e5980 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentRuntimeMetricStatus.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java @@ -1,12 +1,17 @@ package io.wdd.rpc.scheduler.service.status; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.utils.TimeUtils; +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.sender.OMessageToAgentSender; import io.wdd.rpc.status.OctopusStatusMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; +import java.time.LocalDateTime; import java.util.List; import java.util.stream.Collectors; @@ -20,13 +25,20 @@ import static io.wdd.rpc.status.OctopusStatusMessage.METRIC_STATUS_MESSAGE_TYPE; */ @Service @Slf4j -public class AgentRuntimeMetricStatus { +public class AgentMetricStatusCollectService { public static final String METRIC_REPORT_TIME_PINCH = "metricRepeatPinch"; public static final String METRIC_REPORT_TIMES_COUNT = "metricRepeatCount"; @Resource - CollectAgentStatus collectAgentStatus; + OctopusStatusMessage octopusStatusMessage; + + @Resource + OMessageToAgentSender oMessageToAgentSender; + + @Resource + ObjectMapper objectMapper; + public void collect(int metricRepeatCount, int metricRepeatPinch) { @@ -34,35 +46,32 @@ public class AgentRuntimeMetricStatus { if (CollectionUtils.isEmpty(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST)) { log.error("Metric Status Collect Failed ! no ALL_HEALTHY_AGENT_TOPIC_NAMES"); } - // 构建 OctopusMessage - // 只发送一次消息,让Agent循环定时执行任务 + buildMetricStatusMessageAndSend( metricRepeatCount, metricRepeatPinch ); - // } private void buildMetricStatusMessageAndSend(int metricRepeatCount, int metricRepeatPinch) { - List collect = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST + LocalDateTime currentTime = TimeUtils.currentFormatTime(); + + List octopusStatusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST .stream() .map( - agentTopicName -> { - return OctopusStatusMessage - .builder() - .statusType(METRIC_STATUS_MESSAGE_TYPE) - .metricRepeatCount(metricRepeatCount) - .metricRepeatPinch(metricRepeatPinch) - .agentTopicName(agentTopicName) - .build(); - } + agentTopicName -> octopusStatusMessage + .ConstructAgentStatusMessage( + METRIC_STATUS_MESSAGE_TYPE, + agentTopicName, + currentTime + ) ) .collect(Collectors.toList()); - // send to the next level - collectAgentStatus.statusMessageToAgent(collect); + // batch send all messages to RabbitMQ + oMessageToAgentSender.send(octopusStatusMessageList); } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/CollectAgentStatus.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/CollectAgentStatus.java deleted file mode 100644 index b0dd670..0000000 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/CollectAgentStatus.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.wdd.rpc.scheduler.service.status; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.wdd.common.utils.TimeUtils; -import io.wdd.rpc.message.OctopusMessage; -import io.wdd.rpc.message.OctopusMessageType; -import io.wdd.rpc.message.sender.OMessageToAgentSender; -import io.wdd.rpc.status.OctopusStatusMessage; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.List; -import java.util.stream.Collectors; - -/** - * 1. 定时任务 - * 2. 向RabbitMQ中发送消息,STATUS类型的消息 - * 3. 然后开始监听相应的Result StreamKey - */ -@Service -public class CollectAgentStatus { - - @Resource - OMessageToAgentSender oMessageToAgentSender; - - @Resource - ObjectMapper objectMapper; - - - public void collectAgentStatus(OctopusStatusMessage statusMessage) { - - this.statusMessageToAgent(List.of(statusMessage)); - } - - - public void statusMessageToAgent(List statusMessageList) { - - // build all the OctopusMessage - List octopusMessageList = statusMessageList.stream().map( - statusMessage -> { - OctopusMessage octopusMessage = buildOctopusMessageStatus(statusMessage); - return octopusMessage; - } - ).collect(Collectors.toList()); - - // batch send all messages to RabbitMQ - oMessageToAgentSender.send(octopusMessageList); - - // todo how to get result ? - } - - private OctopusMessage buildOctopusMessageStatus(OctopusStatusMessage octopusStatusMessage) { - - // must be like this or it will be deserialized as LinkedHashMap - String s; - try { - s = objectMapper.writeValueAsString(octopusStatusMessage); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return OctopusMessage.builder() - .uuid(octopusStatusMessage.getAgentTopicName()) - .type(OctopusMessageType.STATUS) - .init_time(TimeUtils.currentTime()) - .content(s) - .build(); - } - - -} diff --git a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java index 49c5098..7454bd1 100644 --- a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java +++ b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java @@ -1,10 +1,17 @@ package io.wdd.rpc.status; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.rpc.message.OctopusMessage; +import io.wdd.rpc.message.OctopusMessageType; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; +import javax.annotation.Resource; +import java.time.LocalDateTime; + @Data @AllArgsConstructor @NoArgsConstructor @@ -19,6 +26,9 @@ public class OctopusStatusMessage { public static final String METRIC_STATUS_MESSAGE_TYPE = "METRIC"; public static final String APP_STATUS_MESSAGE_TYPE = "APP"; + @Resource + ObjectMapper objectMapper; + /** * which kind of status should be return * metric => short time message @@ -31,4 +41,28 @@ public class OctopusStatusMessage { int metricRepeatPinch; + public OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) { + + OctopusStatusMessage statusMessage = OctopusStatusMessage + .builder() + .statusType(statusType) + .build(); + + String ops; + try { + ops = objectMapper.writeValueAsString(statusMessage); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return OctopusMessage + .builder() + .type(OctopusMessageType.STATUS) + .uuid(agentTopicName) + .init_time(currentTime) + .content(ops) + .build(); + + } + } diff --git a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java index 7286c6d..8945332 100644 --- a/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/status/service/AsyncStatusServiceImpl.java @@ -1,7 +1,5 @@ package io.wdd.rpc.status.service; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.message.OctopusMessage; import io.wdd.rpc.message.OctopusMessageType; @@ -33,7 +31,7 @@ public class AsyncStatusServiceImpl implements AsyncStatusService { OMessageToAgentSender oMessageToAgentSender; @Resource - ObjectMapper objectMapper; + OctopusStatusMessage octopusStatusMessage; @Resource AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService; @@ -107,7 +105,7 @@ public class AsyncStatusServiceImpl implements AsyncStatusService { List octopusStatusMessageList = ALL_AGENT_TOPIC_NAME_LIST .stream() .map( - agentTopicName -> ConstructAgentStatusMessage( + agentTopicName -> octopusStatusMessage.ConstructAgentStatusMessage( HEALTHY_STATUS_MESSAGE_TYPE, agentTopicName, currentTime @@ -120,27 +118,5 @@ public class AsyncStatusServiceImpl implements AsyncStatusService { } - private OctopusMessage ConstructAgentStatusMessage(String statusType, String agentTopicName, LocalDateTime currentTime) { - OctopusStatusMessage statusMessage = OctopusStatusMessage - .builder() - .statusType(statusType) - .build(); - - String ops; - try { - ops = objectMapper.writeValueAsString(statusMessage); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - return OctopusMessage - .builder() - .type(CurrentAppOctopusMessageType) - .uuid(agentTopicName) - .init_time(currentTime) - .content(ops) - .build(); - - } }