From 4139e835e292f681f886e865d08952839a8332fe Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 13 Jan 2023 14:37:57 +0800 Subject: [PATCH] [ server ] [ execution ]- optimize proceed and add persistence log --- .../beans/executor/ExecutionMessage.java | 6 + .../java/io/wdd/common/utils/TimeUtils.java | 53 +++++-- .../execute/config/CommandReaderConfig.java | 34 +++++ .../rpc/execute/config/ExecutionLogBean.java | 16 +++ .../ExecutionResultStringDeserializer.java | 33 +++++ .../execute/result/CommandResultReader.java | 94 ++++++------ .../execute/result/CreateStreamReader.java | 52 ++++--- .../result/RedisStreamReaderConfig.java | 18 +-- .../service/CoreExecutionServiceImpl.java | 31 ++-- .../service/ExecutionResultDaemonHandler.java | 134 ++++++++++++++++++ 10 files changed, 367 insertions(+), 104 deletions(-) create mode 100644 server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java create mode 100644 server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java diff --git a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java index 77c9e8f..3b3a277 100644 --- a/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java +++ b/common/src/main/java/io/wdd/common/beans/executor/ExecutionMessage.java @@ -1,5 +1,6 @@ package io.wdd.common.beans.executor; +import io.wdd.common.utils.TimeUtils; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -19,4 +20,9 @@ public class ExecutionMessage { private String resultKey; + public static String GetResultKey(String topicName) { + + return topicName + "Execution:" + TimeUtils.currentTimeStringFullSplit(); + } + } diff --git a/common/src/main/java/io/wdd/common/utils/TimeUtils.java b/common/src/main/java/io/wdd/common/utils/TimeUtils.java index 478a28e..135b448 100644 --- a/common/src/main/java/io/wdd/common/utils/TimeUtils.java +++ b/common/src/main/java/io/wdd/common/utils/TimeUtils.java @@ -22,18 +22,27 @@ public class TimeUtils { private static final Map times = new LinkedHashMap<>(); static { - times.put("year", TimeUnit.DAYS.toMillis(365)); - times.put("month", TimeUnit.DAYS.toMillis(30)); - times.put("week", TimeUnit.DAYS.toMillis(7)); - times.put("day", TimeUnit.DAYS.toMillis(1)); - times.put("hour", TimeUnit.HOURS.toMillis(1)); - times.put("minute", TimeUnit.MINUTES.toMillis(1)); - times.put("second", TimeUnit.SECONDS.toMillis(1)); + times.put("year", + TimeUnit.DAYS.toMillis(365)); + times.put("month", + TimeUnit.DAYS.toMillis(30)); + times.put("week", + TimeUnit.DAYS.toMillis(7)); + times.put("day", + TimeUnit.DAYS.toMillis(1)); + times.put("hour", + TimeUnit.HOURS.toMillis(1)); + times.put("minute", + TimeUnit.MINUTES.toMillis(1)); + times.put("second", + TimeUnit.SECONDS.toMillis(1)); } public static ByteBuffer currentTimeByteBuffer() { - byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); + byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + .getBytes(StandardCharsets.UTF_8); return ByteBuffer.wrap(timeBytes); } @@ -48,9 +57,21 @@ public class TimeUtils { */ public static String currentTimeString() { - return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + return LocalDateTime.now(ZoneId.of("UTC+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } + /** + * @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String + */ + public static String currentTimeStringFullSplit() { + + return LocalDateTime.now(ZoneId.of("UTC+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")); + + } + + public static String localDateTimeString(LocalDateTime time) { return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } @@ -62,10 +83,10 @@ public class TimeUtils { long timeDelta = duration / time.getValue(); if (timeDelta > 0) { res.append(timeDelta) - .append(" ") - .append(time.getKey()) - .append(timeDelta > 1 ? "s" : "") - .append(", "); + .append(" ") + .append(time.getKey()) + .append(timeDelta > 1 ? "s" : "") + .append(", "); duration -= time.getValue() * timeDelta; level++; } @@ -83,7 +104,8 @@ public class TimeUtils { } public static String toRelative(long duration) { - return toRelative(duration, times.size()); + return toRelative(duration, + times.size()); } public static String toRelative(Date start, Date end) { @@ -93,6 +115,7 @@ public class TimeUtils { public static String toRelative(Date start, Date end, int level) { assert start.after(end); - return toRelative(end.getTime() - start.getTime(), level); + return toRelative(end.getTime() - start.getTime(), + level); } } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java new file mode 100644 index 0000000..ba2abc6 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java @@ -0,0 +1,34 @@ +package io.wdd.rpc.execute.config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.ArrayList; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder(toBuilder = true) +public class CommandReaderConfig { + + /** + * 消费者类型:独立消费、消费组消费 + */ + private String consumerType; + /** + * 消费组 + */ + private String group; + /** + * 消费组中的某个消费者 + */ + private String consumerName; + + /** + * 执行的结果对象,保存在此处 + */ + private String ExecutionResult; + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java new file mode 100644 index 0000000..1278435 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java @@ -0,0 +1,16 @@ +package io.wdd.rpc.execute.config; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder(toBuilder = true) +@ApiModel("Execution模快持久化Bean对象") +public class ExecutionLogBean { +} diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java new file mode 100644 index 0000000..c47248e --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionResultStringDeserializer.java @@ -0,0 +1,33 @@ +package io.wdd.rpc.execute.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.ArrayList; + +public class ExecutionResultStringDeserializer { + + public static ArrayList format(String executionResultString) { + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, + true); + + try { + + String tmp = objectMapper.readValue(executionResultString, + new TypeReference() { + }); + + return objectMapper.readValue(tmp, + new TypeReference>() { + }); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } +} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index 21e6d0e..013968a 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -1,25 +1,21 @@ package io.wdd.rpc.execute.result; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.rpc.execute.config.CommandReaderConfig; +import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.connection.stream.MapRecord; -import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; -import org.springframework.stereotype.Component; import java.util.ArrayList; -import java.util.List; @Getter @Setter @Slf4j -public class CommandResultReader implements StreamListener> { +public class CommandResultReader implements StreamListener> { // https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde @@ -27,24 +23,32 @@ public class CommandResultReader implements StreamListener executionResult = ExecutionResultStringDeserializer.format(valueString); - try { - - String tmp = objectMapper.readValue(valueString, new TypeReference() { - }); - - List stringList = objectMapper.readValue(tmp, new TypeReference>() { - }); - - stringList.stream().forEach( - System.out::println - ); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + executionResult.stream() + .forEach( + System.out::println + ); } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java index 1e5e870..42bebb6 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CreateStreamReader.java @@ -2,47 +2,46 @@ package io.wdd.rpc.execute.result; import io.wdd.server.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.config.AutowireCapableBeanFactory; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.stereotype.Component; import java.util.HashMap; -import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; - @Component @Slf4j public class CreateStreamReader { + private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); private RedisStreamReaderConfig redisStreamReaderConfig; - - private final HashMap REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16); - - public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { + registerStreamReader(redisStreamListenerContainerBeanName, + streamKey, + null); + } + + public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, String ExecutionResult) { // prepare the environment prepareEnv(); // oldStreamKey equals streamKey don't need to do anything , just return - if (redisStreamReaderConfig.getStreamKey().equals(streamKey)) { + if (redisStreamReaderConfig.getStreamKey() + .equals(streamKey)) { log.debug("redis listener container not change !"); return; } - // destroy the REDIS_STREAM_LISTENER_CONTAINER + // destroy the old REDIS_STREAM_LISTENER_CONTAINER destroyStreamReader(streamKey); // modify the configuration ==> streamKey - modifyStreamReader(streamKey); + modifyStreamReader(streamKey, ExecutionResult); // re-create the REDIS_STREAM_LISTENER_CONTAINER - createStreamReader(redisStreamListenerContainerBeanName, streamKey); + createStreamReader(redisStreamListenerContainerBeanName, + streamKey); } @@ -53,9 +52,10 @@ public class CreateStreamReader { } private void getRedisStreamConfig() { - this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class); - } + this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", + RedisStreamReaderConfig.class); + } private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { @@ -63,39 +63,47 @@ public class CreateStreamReader { log.debug("start to create the redis stream listener container"); // create the lazy bean - StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class); + StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, + StreamMessageListenerContainer.class); - REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer); + REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, + streamMessageListenerContainer); // very important log.debug("start the listener container"); streamMessageListenerContainer.start(); + } - private void modifyStreamReader(String streamKey) { + private void modifyStreamReader(String streamKey, String executionResult) { log.debug("start to modify the redis stream listener container stream key"); String oldStreamKey = redisStreamReaderConfig.getStreamKey(); - log.debug("change stream key from [{}] to [{}]", oldStreamKey, streamKey); + log.debug("change stream key from [{}] to [{}]", + oldStreamKey, + streamKey); log.debug("start to set the Redis Stream Reader key"); redisStreamReaderConfig.setStreamKey(streamKey); + log.debug("start to set the Redis Stream Execution Result Container"); + redisStreamReaderConfig.setExecutionResult(executionResult); + } private void destroyStreamReader(String streamKey) { - String oldStreamKey = redisStreamReaderConfig.getStreamKey(); if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); - log.debug("destroyed old redis stream listener container is [ {} ]", streamMessageListenerContainer); + log.debug("destroyed old redis stream listener container is [ {} ]", + streamMessageListenerContainer); // double destroy diff --git a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java index 1e1591c..786c5b9 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/RedisStreamReaderConfig.java @@ -2,6 +2,8 @@ package io.wdd.rpc.execute.result; import io.wdd.rpc.status.AgentStatusStreamReader; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,6 +20,8 @@ import java.time.Duration; @Configuration @Slf4j +@Getter +@Setter public class RedisStreamReaderConfig { @Resource @@ -29,13 +33,7 @@ public class RedisStreamReaderConfig { private String streamKey = "cccc"; - public void setStreamKey(String streamKey) { - this.streamKey = streamKey; - } - - public String getStreamKey() { - return streamKey; - } + private String executionResult = null; @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) @Scope("prototype") @@ -49,6 +47,8 @@ public class RedisStreamReaderConfig { StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); + // todo 此部分可以被移出到另外的位置,会更加方便,就不需要对此Bean进行创建和销毁了 + listenerContainer.receive( StreamOffset.create(streamKey, ReadOffset.lastConsumed()), @@ -56,7 +56,9 @@ public class RedisStreamReaderConfig { new CommandResultReader( "OctopusServer", streamKey, - "OctopusServer") + "OctopusServer", + executionResult + ) ); diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 2121b57..3d32543 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import io.wdd.rpc.execute.config.ExecutionLogBean; import io.wdd.rpc.execute.result.CreateStreamReader; import io.wdd.rpc.message.sender.ToAgentMessageSender; import lombok.extern.slf4j.Slf4j; @@ -18,6 +19,7 @@ import java.util.List; import java.util.stream.Collectors; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; +import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST; @Service @Slf4j @@ -48,36 +50,44 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override public String SendCommandToAgent(String topicName, String type, List commandList) { + // 构造 Execution Command对应的消息体 OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList); + // 获取 ResultKey ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent(); - String executionMsg; - try { - executionMsg = objectMapper.writeValueAsString(executionMessage); octopusMessage.setContent(executionMsg); } catch (JsonProcessingException e) { throw new RuntimeException(e); } - String resultKey = executionMessage.getResultKey(); + // send the message + messageSender.send(octopusMessage); + // set up the stream read group String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey); log.debug("set consumer group for the stream key with => [ {} ]", resultKey); // change the redis stream listener container - createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); + // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); - // send the message - messageSender.send(octopusMessage); + // construct the persistent Bean + ExecutionLogBean executionLogBean = buildPersistentLogBeanFromOctopusMessage(octopusMessage); + // send resultKey to ExecutionResultDaemonHandler + WAIT_EXECUTION_RESULT_LIST.put(resultKey, executionLogBean); return resultKey; } + private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) { + + return null; + } + @Override public List SendCommandToAgent(List topicNameList, String type, List command) { @@ -93,7 +103,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { ExecutionMessage executionMessage = generateExecutionMessage( type, commandList, - generateCommandResultKey(topicName) + ExecutionMessage.GetResultKey(topicName) ); return OctopusMessage.builder() @@ -114,11 +124,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { } - private String generateCommandResultKey(String topicName) { - String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss")); - - return topicName + "-" + TimeString; - } } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java new file mode 100644 index 0000000..6bdc0b7 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java @@ -0,0 +1,134 @@ +package io.wdd.rpc.execute.service; + + +import io.wdd.rpc.execute.config.ExecutionLogBean; +import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer; +import io.wdd.rpc.execute.result.CreateStreamReader; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.concurrent.*; + +import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; + +/** + * 1. [waiting strategy ] + * 2. [build the redis stream listener] + * 3. [call persistence] + */ +@Service +@Slf4j +public class ExecutionResultDaemonHandler { + + /** + * store all execution result key + *

+ * which means there are execution running , waiting for their result to handle + */ + public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); + + @Resource + CreateStreamReader createStreamReader; + + + @PostConstruct + public void startExecutionDaemonHandler() { + + while (true) { + + if (WAIT_EXECUTION_RESULT_LIST.size() == 0) { + try { + // no execution result need to handle + // wait for 5 seconds + TimeUnit.SECONDS.sleep(5); + + // restart the forever loop + continue; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // has result to handle , just handle one result at one time + String resultKey = WAIT_EXECUTION_RESULT_LIST.keys() + .nextElement(); + + + CompletableFuture executionResultFuture = CompletableFuture.supplyAsync( + () -> { + String executionResult = ""; + + // 构造 resultKey对应的 Redis Stream Listener Container + createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, + resultKey, + executionResult); + // 获得结果 + String s = "no no no"; + try { + s = CompletableFuture.supplyAsync( + () -> { + while (true) { + if (StringUtils.isNotEmpty(executionResult)) { + return executionResult; + } + + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + ) + .get(80, + TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + + return s; + } + ); + + CompletableFuture falloutTimeFuture = CompletableFuture.supplyAsync( + () -> { + try { + TimeUnit.SECONDS.sleep(70); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return "[ Failed ] - execution has failed !"; + } + ); + + // 获取结果,然后销毁Stream Listener Container + CompletableFuture.anyOf(falloutTimeFuture, + executionResultFuture) + .whenCompleteAsync( + (resultString, e) -> { + + log.info("execution result are => {}", + resultString); + + // 持久化存储对应的结果 + ExecutionResultStringDeserializer.format(String.valueOf(resultString)); + + + } + ); + + } + + + } + + +}