From 0a78f9a02b28dc0301a3acdd9826643864651b38 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 11 Aug 2023 15:47:54 +0800 Subject: [PATCH] =?UTF-8?q?[=20Service=20]=20[=20Executor=20]=20=E4=BF=AE?= =?UTF-8?q?=E6=94=B9OctopusObjectMapper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/rabbitmq/OctopusMessage.go | 2 +- agent-go/utils/TimeUtils.go | 1 + .../config/OctopusObjectMapperConfig.java | 105 ++++++++++++++++++ .../utils/OctopusObjectMapperConfig.java | 54 --------- .../java/io/wdd/common/utils/TimeUtils.java | 2 + .../wdd/rpc/execute/ExecutionServiceImpl.java | 17 ++- .../io/wdd/rpc/init/AcceptAgentInitInfo.java | 2 +- .../message/handler/sync/OMessageHandler.java | 68 +++++++----- .../sync/OMessageToServerListener.java | 8 +- .../message/sender/OMessageToAgentSender.java | 7 +- .../AgentMetricStatusCollectService.java | 2 +- .../wdd/rpc/status/OctopusStatusMessage.java | 2 +- .../status/service/SyncStatusServiceImpl.java | 2 +- .../io/wdd/server/beans/vo/ServerInfoVO.java | 3 + .../io/wdd/server/ServerApplicationTests.java | 52 --------- 15 files changed, 175 insertions(+), 152 deletions(-) create mode 100644 server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java delete mode 100644 server/src/main/java/io/wdd/common/utils/OctopusObjectMapperConfig.java diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go index 614f4b9..2f5cfd0 100644 --- a/agent-go/rabbitmq/OctopusMessage.go +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -136,7 +136,7 @@ func executorOMHandler(octopusMessage *OctopusMessage) { // send back the result log octopusMessage.Result = resultLog - octopusMessage.ACTime = utils.ParseISOLocalDateTime() + octopusMessage.ACTime = utils.ParseDateTimeTime() // Send octopusMessage.SendToOctopusServer() diff --git a/agent-go/utils/TimeUtils.go b/agent-go/utils/TimeUtils.go index be6d10a..4dfb872 100644 --- a/agent-go/utils/TimeUtils.go +++ b/agent-go/utils/TimeUtils.go @@ -15,6 +15,7 @@ func ParseDateTimeTime() string { return now.Format(time.DateTime) } +// ParseISOLocalDateTime 时间格式为2023-08-11T10:48:15+08:00 func ParseISOLocalDateTime() string { now := time.Now() return now.Format(time.RFC3339) diff --git a/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java b/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java new file mode 100644 index 0000000..b230c59 --- /dev/null +++ b/server/src/main/java/io/wdd/common/config/OctopusObjectMapperConfig.java @@ -0,0 +1,105 @@ +package io.wdd.common.config; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + + +@Component +public class OctopusObjectMapperConfig { + + private static final String STANDARD_PATTERN = "yyyy-MM-dd HH:mm:ss"; + + private static final String DATE_PATTERN = "yyyy-MM-dd"; + + private static final String TIME_PATTERN = "HH:mm:ss"; + + public static ObjectMapper OctopusObjectMapper = null; + + + @PostConstruct + public void setOctopusObjectMapper() { + OctopusObjectMapper = new ObjectMapper(); + + /* + * 1. java.util.Date yyyy-MM-dd HH:mm:ss + * 2. 支持JDK8 LocalDateTime、LocalDate、 LocalTime + * 3. Jdk8Module模块支持如Stream、Optional等类 + * 4. 序列化时包含所有字段 + * 5. 在序列化一个空对象时时不抛出异常 + * 6. 忽略反序列化时在json字符串中存在, 但在java对象中不存在的属性 + * 7. 数字序列化成字符穿且调用BigDecimal.toPlainString()方法 + */ + OctopusObjectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + + // 初始化JavaTimeModule + JavaTimeModule javaTimeModule = new JavaTimeModule(); + + //处理LocalDateTime + DateTimeFormatter dateTimeFormatter = DateTimeFormatter + .ofPattern(STANDARD_PATTERN); + javaTimeModule.addSerializer( + LocalDateTime.class, + new LocalDateTimeSerializer(dateTimeFormatter) + ); + javaTimeModule.addDeserializer( + LocalDateTime.class, + new LocalDateTimeDeserializer(dateTimeFormatter) + ); + + //处理LocalDate + DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern(DATE_PATTERN); + javaTimeModule.addSerializer( + LocalDate.class, + new LocalDateSerializer(dateFormatter) + ); + javaTimeModule.addDeserializer( + LocalDate.class, + new LocalDateDeserializer(dateFormatter) + ); + + //处理LocalTime + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(TIME_PATTERN); + javaTimeModule.addSerializer( + LocalTime.class, + new LocalTimeSerializer(timeFormatter) + ); + javaTimeModule.addDeserializer( + LocalTime.class, + new LocalTimeDeserializer(timeFormatter) + ); + + OctopusObjectMapper.registerModule(javaTimeModule); + OctopusObjectMapper.setDateFormat( + new SimpleDateFormat(STANDARD_PATTERN) + ); + + OctopusObjectMapper.configure( + SerializationFeature.FAIL_ON_EMPTY_BEANS, + false + ); + OctopusObjectMapper.configure( + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, + false + ); + + } + + +} diff --git a/server/src/main/java/io/wdd/common/utils/OctopusObjectMapperConfig.java b/server/src/main/java/io/wdd/common/utils/OctopusObjectMapperConfig.java deleted file mode 100644 index 783eb37..0000000 --- a/server/src/main/java/io/wdd/common/utils/OctopusObjectMapperConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package io.wdd.common.utils; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; -import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; -import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer; -import org.springframework.context.annotation.Configuration; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; - - -@Configuration -public class OctopusObjectMapperConfig { - - public static ObjectMapper OctopusObjectMapper = null; - - @Resource - ObjectMapper objectMapper; - - @PostConstruct - public void setOctopusObjectMapper() { - OctopusObjectMapper = objectMapper; - } - - public static Jackson2ObjectMapperBuilderCustomizer common() { - - return jacksonObjectMapperBuilder -> { - //若POJO对象的属性值为null,序列化时不进行显示 - jacksonObjectMapperBuilder.serializationInclusion(JsonInclude.Include.NON_NULL); - - //针对于Date类型,文本格式化 - jacksonObjectMapperBuilder.simpleDateFormat("yyyy-MM-dd"); - - // - jacksonObjectMapperBuilder.failOnEmptyBeans(false); - jacksonObjectMapperBuilder.failOnUnknownProperties(false); - jacksonObjectMapperBuilder.autoDetectFields(true); - - //针对于JDK新时间类。序列化时带有T的问题,自定义格式化字符串 - JavaTimeModule javaTimeModule = new JavaTimeModule(); - javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); - javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); - - jacksonObjectMapperBuilder.modules(javaTimeModule); - - }; - } - -} diff --git a/server/src/main/java/io/wdd/common/utils/TimeUtils.java b/server/src/main/java/io/wdd/common/utils/TimeUtils.java index 743e5ea..2a857e9 100644 --- a/server/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/server/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -109,6 +109,8 @@ public class TimeUtils { } /** + * yyyy-MM-dd HH:mm:ss + * * @return 格式化 去掉时间中的毫秒数 */ public static LocalDateTime currentFormatTime() { diff --git a/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java index b80ba42..4e7bbf5 100644 --- a/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/ExecutionServiceImpl.java @@ -12,6 +12,8 @@ import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; +import static io.wdd.rpc.message.handler.sync.OMessageHandler.StopWaitingResult; +import static io.wdd.rpc.message.handler.sync.OMessageHandler.WaitFromAgent; import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET; @Service @@ -51,18 +53,26 @@ public class ExecutionServiceImpl implements ExecutionService { // send the message oMessageToAgentSender.send(octopusMessage); + System.out.println("originOctopusMessage = " + octopusMessage.hashCode()); // 需要返回结果 if (!durationTask) { + // 等待结果 + WaitFromAgent(octopusMessage); synchronized (octopusMessage) { + try { octopusMessage.wait(10000); + + + log.debug("等待结束!"); + } catch (InterruptedException e) { throw new RuntimeException(e); } - } + } // 转换结果 commandResultLog = (ArrayList) octopusMessage.getResult(); @@ -76,6 +86,9 @@ public class ExecutionServiceImpl implements ExecutionService { } + // 释放等待队列 + StopWaitingResult(octopusMessage); + return commandResultLog; } @@ -85,7 +98,7 @@ public class ExecutionServiceImpl implements ExecutionService { return OctopusMessage .builder() .octopusMessageType(OctopusMessageType.EXECUTOR) - .init_time(TimeUtils.currentFormatTime()) + .init_time(TimeUtils.currentTime()) .uuid(agentTopicName) .content( executionMessage diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index 22f7cce..d503baf 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -27,7 +27,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; /** * The type Accept boot up info message. diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java index 5168053..121cade 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageHandler.java @@ -9,6 +9,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.time.LocalDateTime; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP; import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT; @@ -43,6 +44,42 @@ public class OMessageHandler { } + public static void WaitFromAgent(OctopusMessage octopusMessage) { + + // 构建 MatchKey + String matchKey = GenerateOMessageMatchKey( + octopusMessage.getOctopusMessageType(), + octopusMessage.getInit_time() + ); + + // 开始等待 + FROM_AGENT_MATCH_TO_AGENT_MAP.put( + matchKey, + octopusMessage + ); + + //debug + log.info( + "wait from agent map is => {}", + FROM_AGENT_MATCH_TO_AGENT_MAP + ); + } + + public static void StopWaitingResult(OctopusMessage octopusMessage) { + + // 构建 MatchKey + String matchKey = GenerateOMessageMatchKey( + octopusMessage.getOctopusMessageType(), + octopusMessage.getInit_time() + ); + + // 开始等待 + if (FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) { + FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey); + } + + } + /** * 解析所有从Agent传回的消息,中央集中化处理 */ @@ -54,7 +91,7 @@ public class OMessageHandler { if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { try { - OCTOPUS_MESSAGE_FROM_AGENT.wait(5000); + TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -70,6 +107,7 @@ public class OMessageHandler { replayOMessage.getOctopusMessageType(), replayOMessage.getInit_time() ); + if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) { // 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环 // todo 错误的数据需要放置于某处 @@ -93,32 +131,4 @@ public class OMessageHandler { } } - - protected void waitFor(OctopusMessage octopusMessage) { - - // 构建 MatchKey - String matchKey = GenerateOMessageMatchKey( - octopusMessage.getOctopusMessageType(), - octopusMessage.getInit_time() - ); - - // 开始等待 - FROM_AGENT_MATCH_TO_AGENT_MAP.put( - matchKey, - octopusMessage - ); - } - - public void stopWaiting(OctopusMessage octopusMessage) { - - // 构建 MatchKey - String matchKey = GenerateOMessageMatchKey( - octopusMessage.getOctopusMessageType(), - octopusMessage.getInit_time() - ); - - // 开始等待 - FROM_AGENT_MATCH_TO_AGENT_MAP.remove(matchKey); - - } } diff --git a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java index b681e99..c4fbb7a 100644 --- a/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java +++ b/server/src/main/java/io/wdd/rpc/message/handler/sync/OMessageToServerListener.java @@ -15,7 +15,7 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.HashMap; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; @Configuration @Slf4j(topic = "Octopus Message Listener") @@ -72,6 +72,8 @@ public class OMessageToServerListener { octopusMessage ); + System.out.println("receivedOctopusMessage = " + octopusMessage.hashCode()); + // 获取Agent的版本信息 if (octopusMessage .getUuid() @@ -96,9 +98,5 @@ public class OMessageToServerListener { // 将收到的消息,直接存储到 缓存队列中 log.debug("cache the octopus message to inner cache list !"); OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage); - oMessageHandler.waitFor(octopusMessage); - - // 唤醒等待线程 - OCTOPUS_MESSAGE_FROM_AGENT.notify(); } } diff --git a/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java b/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java index aac1eea..1048976 100644 --- a/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java +++ b/server/src/main/java/io/wdd/rpc/message/sender/OMessageToAgentSender.java @@ -2,7 +2,6 @@ package io.wdd.rpc.message.sender; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.handler.MyRuntimeException; import io.wdd.rpc.init.InitRabbitMQConfig; import io.wdd.rpc.message.OctopusMessage; @@ -15,7 +14,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; /** * adaptor @@ -31,8 +30,6 @@ public class OMessageToAgentSender { @Resource InitRabbitMQConfig initRabbitMQConfig; - @Resource - ObjectMapper objectMapper; /** * send to Queue -- InitFromServer @@ -106,7 +103,7 @@ public class OMessageToAgentSender { @SneakyThrows private byte[] writeData(Object data) { - return objectMapper.writeValueAsBytes(data); + return OctopusObjectMapper.writeValueAsBytes(data); } } diff --git a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java index 7640181..109d25c 100644 --- a/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java +++ b/server/src/main/java/io/wdd/rpc/scheduler/service/status/AgentMetricStatusCollectService.java @@ -12,7 +12,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.Map; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; /** diff --git a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java index 3d034b8..e7f621d 100644 --- a/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java +++ b/server/src/main/java/io/wdd/rpc/status/OctopusStatusMessage.java @@ -10,7 +10,7 @@ import lombok.experimental.SuperBuilder; import java.time.LocalDateTime; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; @Data @AllArgsConstructor diff --git a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java index e93840b..ab48e2d 100644 --- a/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/status/service/SyncStatusServiceImpl.java @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static io.wdd.common.utils.OctopusObjectMapperConfig.OctopusObjectMapper; +import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper; import static io.wdd.rpc.status.OctopusStatusMessage.*; @Slf4j diff --git a/server/src/main/java/io/wdd/server/beans/vo/ServerInfoVO.java b/server/src/main/java/io/wdd/server/beans/vo/ServerInfoVO.java index bdeac38..b81e7ff 100644 --- a/server/src/main/java/io/wdd/server/beans/vo/ServerInfoVO.java +++ b/server/src/main/java/io/wdd/server/beans/vo/ServerInfoVO.java @@ -67,6 +67,8 @@ public class ServerInfoVO { @TableField(fill = FieldFill.INSERT_UPDATE) private LocalDateTime updateTime; + private Integer proxyType; + /** * server location , type City Country */ @@ -149,4 +151,5 @@ public class ServerInfoVO { private String agentVersion; + } diff --git a/server/src/test/java/io/wdd/server/ServerApplicationTests.java b/server/src/test/java/io/wdd/server/ServerApplicationTests.java index 8851cba..a9d9eaa 100644 --- a/server/src/test/java/io/wdd/server/ServerApplicationTests.java +++ b/server/src/test/java/io/wdd/server/ServerApplicationTests.java @@ -5,8 +5,6 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; @SpringBootTest class ServerApplicationTests { @@ -18,57 +16,7 @@ class ServerApplicationTests { @Test void testCoreExecutionCompleteScript() { - ArrayList command1 = new ArrayList<>( - List.of( - "echo", - "yes" - ) - ); - ArrayList command2 = new ArrayList<>( - List.of( - "apt-get", - "update" - ) - ); - - ArrayList command3 = new ArrayList<>( - List.of( - "echo", - "\"no\"" - ) - ); - - ArrayList command4 = new ArrayList<>( - List.of( - "apt-get", - "install", - "nginx", - "-y" - ) - ); - - List> completeScript = new ArrayList<>(); - completeScript.add(command1); - completeScript.add(command2); - completeScript.add(command3); - completeScript.add(command4); - - - ArrayList targetMachineList = new ArrayList<>( - List.of( - "Chengdu-amd64-98-98066f" - ) - ); - - List resultList = asyncExecutionService.SyncSendCommandToAgentComplete( - targetMachineList, - "Scheduled Script", - completeScript - ); - - - System.out.println("resultList = " + resultList); } }