[ server ] [ execution ]- optimize proceed and add persistence log

This commit is contained in:
zeaslity
2023-01-13 14:37:57 +08:00
parent c875f9d7f7
commit 4139e835e2
10 changed files with 367 additions and 104 deletions

View File

@@ -1,5 +1,6 @@
package io.wdd.common.beans.executor; package io.wdd.common.beans.executor;
import io.wdd.common.utils.TimeUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@@ -19,4 +20,9 @@ public class ExecutionMessage {
private String resultKey; private String resultKey;
public static String GetResultKey(String topicName) {
return topicName + "Execution:" + TimeUtils.currentTimeStringFullSplit();
}
} }

View File

@@ -22,18 +22,27 @@ public class TimeUtils {
private static final Map<String, Long> times = new LinkedHashMap<>(); private static final Map<String, Long> times = new LinkedHashMap<>();
static { static {
times.put("year", TimeUnit.DAYS.toMillis(365)); times.put("year",
times.put("month", TimeUnit.DAYS.toMillis(30)); TimeUnit.DAYS.toMillis(365));
times.put("week", TimeUnit.DAYS.toMillis(7)); times.put("month",
times.put("day", TimeUnit.DAYS.toMillis(1)); TimeUnit.DAYS.toMillis(30));
times.put("hour", TimeUnit.HOURS.toMillis(1)); times.put("week",
times.put("minute", TimeUnit.MINUTES.toMillis(1)); TimeUnit.DAYS.toMillis(7));
times.put("second", TimeUnit.SECONDS.toMillis(1)); times.put("day",
TimeUnit.DAYS.toMillis(1));
times.put("hour",
TimeUnit.HOURS.toMillis(1));
times.put("minute",
TimeUnit.MINUTES.toMillis(1));
times.put("second",
TimeUnit.SECONDS.toMillis(1));
} }
public static ByteBuffer currentTimeByteBuffer() { public static ByteBuffer currentTimeByteBuffer() {
byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
.getBytes(StandardCharsets.UTF_8);
return ByteBuffer.wrap(timeBytes); return ByteBuffer.wrap(timeBytes);
} }
@@ -48,9 +57,21 @@ public class TimeUtils {
*/ */
public static String currentTimeString() { public static String currentTimeString() {
return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); return LocalDateTime.now(ZoneId.of("UTC+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
} }
/**
* @return UTC+8 [ yyyy-MM-dd HH:mm:ss ] Time String
*/
public static String currentTimeStringFullSplit() {
return LocalDateTime.now(ZoneId.of("UTC+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss"));
}
public static String localDateTimeString(LocalDateTime time) { public static String localDateTimeString(LocalDateTime time) {
return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); return time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
} }
@@ -83,7 +104,8 @@ public class TimeUtils {
} }
public static String toRelative(long duration) { public static String toRelative(long duration) {
return toRelative(duration, times.size()); return toRelative(duration,
times.size());
} }
public static String toRelative(Date start, Date end) { public static String toRelative(Date start, Date end) {
@@ -93,6 +115,7 @@ public class TimeUtils {
public static String toRelative(Date start, Date end, int level) { public static String toRelative(Date start, Date end, int level) {
assert start.after(end); assert start.after(end);
return toRelative(end.getTime() - start.getTime(), level); return toRelative(end.getTime() - start.getTime(),
level);
} }
} }

View File

@@ -0,0 +1,34 @@
package io.wdd.rpc.execute.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.ArrayList;
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder(toBuilder = true)
public class CommandReaderConfig {
/**
* 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
/**
* 执行的结果对象,保存在此处
*/
private String ExecutionResult;
}

View File

@@ -0,0 +1,16 @@
package io.wdd.rpc.execute.config;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
@ApiModel("Execution模快持久化Bean对象")
public class ExecutionLogBean {
}

View File

@@ -0,0 +1,33 @@
package io.wdd.rpc.execute.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
public class ExecutionResultStringDeserializer {
public static ArrayList<String> format(String executionResultString) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
true);
try {
String tmp = objectMapper.readValue(executionResultString,
new TypeReference<String>() {
});
return objectMapper.readValue(tmp,
new TypeReference<ArrayList<String>>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,25 +1,21 @@
package io.wdd.rpc.execute.result; package io.wdd.rpc.execute.result;
import com.fasterxml.jackson.core.JsonProcessingException; import io.wdd.rpc.execute.config.CommandReaderConfig;
import com.fasterxml.jackson.core.type.TypeReference; import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
@Getter @Getter
@Setter @Setter
@Slf4j @Slf4j
public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> { public class CommandResultReader implements StreamListener<String, MapRecord<String, String, String>> {
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde // https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde
@@ -27,24 +23,32 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers //https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
/**
* 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
private CommandReaderConfig commandReaderConfig;
public CommandResultReader(String consumerType, String group, String consumerName) { public CommandResultReader(String consumerType, String group, String consumerName) {
this.consumerType = consumerType; new CommandResultReader(consumerType,
this.group = group; group,
this.consumerName = consumerName; consumerName,
null);
}
public CommandResultReader(String consumerType, String group, String consumerName, String executionResult) {
this.commandReaderConfig = CommandReaderConfig.builder()
.consumerName(consumerName)
.group(group)
.consumerType(consumerType)
.ExecutionResult(executionResult)
.build();
}
public CommandResultReader(CommandReaderConfig commandReaderConfig) {
this.commandReaderConfig = commandReaderConfig;
} }
@Override @Override
@@ -52,40 +56,38 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
String streamKey = message.getStream(); String streamKey = message.getStream();
RecordId messageId = message.getId(); RecordId messageId = message.getId();
String key = (String) message.getValue().keySet().toArray()[0]; String key = (String) message.getValue()
String value = message.getValue().get(key); .keySet()
.toArray()[0];
String value = message.getValue()
.get(key);
// 赋值给外部的结果,是的执行的结果可以被拿到
String executionResult = this.commandReaderConfig.getExecutionResult();
if (StringUtils.isNotEmpty(executionResult)) {
executionResult = value;
}
log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId); log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]",
streamKey,
key,
key,
messageId);
// print to console // print to console
printPrettyDeserializedCommandResult(value); printPrettyDeserializedCommandResult(value);
// log to somewhere
} }
private void printPrettyDeserializedCommandResult(String valueString){ private void printPrettyDeserializedCommandResult(String valueString) {
ObjectMapper objectMapper = new ObjectMapper(); ArrayList<String> executionResult = ExecutionResultStringDeserializer.format(valueString);
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
try { executionResult.stream()
.forEach(
String tmp = objectMapper.readValue(valueString, new TypeReference<String>() {
});
List<String> stringList = objectMapper.readValue(tmp, new TypeReference<List<String>>() {
});
stringList.stream().forEach(
System.out::println System.out::println
); );
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} }

View File

@@ -2,47 +2,46 @@ package io.wdd.rpc.execute.result;
import io.wdd.server.utils.SpringUtils; import io.wdd.server.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
@Component @Component
@Slf4j @Slf4j
public class CreateStreamReader { public class CreateStreamReader {
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
private RedisStreamReaderConfig redisStreamReaderConfig; private RedisStreamReaderConfig redisStreamReaderConfig;
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
registerStreamReader(redisStreamListenerContainerBeanName,
streamKey,
null);
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, String ExecutionResult) {
// prepare the environment // prepare the environment
prepareEnv(); prepareEnv();
// oldStreamKey equals streamKey don't need to do anything , just return // oldStreamKey equals streamKey don't need to do anything , just return
if (redisStreamReaderConfig.getStreamKey().equals(streamKey)) { if (redisStreamReaderConfig.getStreamKey()
.equals(streamKey)) {
log.debug("redis listener container not change !"); log.debug("redis listener container not change !");
return; return;
} }
// destroy the REDIS_STREAM_LISTENER_CONTAINER // destroy the old REDIS_STREAM_LISTENER_CONTAINER
destroyStreamReader(streamKey); destroyStreamReader(streamKey);
// modify the configuration ==> streamKey // modify the configuration ==> streamKey
modifyStreamReader(streamKey); modifyStreamReader(streamKey, ExecutionResult);
// re-create the REDIS_STREAM_LISTENER_CONTAINER // re-create the REDIS_STREAM_LISTENER_CONTAINER
createStreamReader(redisStreamListenerContainerBeanName, streamKey); createStreamReader(redisStreamListenerContainerBeanName,
streamKey);
} }
@@ -53,9 +52,10 @@ public class CreateStreamReader {
} }
private void getRedisStreamConfig() { private void getRedisStreamConfig() {
this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class);
}
this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig",
RedisStreamReaderConfig.class);
}
private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) { private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
@@ -63,39 +63,47 @@ public class CreateStreamReader {
log.debug("start to create the redis stream listener container"); log.debug("start to create the redis stream listener container");
// create the lazy bean // create the lazy bean
StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class); StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName,
StreamMessageListenerContainer.class);
REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer); REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey,
streamMessageListenerContainer);
// very important // very important
log.debug("start the listener container"); log.debug("start the listener container");
streamMessageListenerContainer.start(); streamMessageListenerContainer.start();
} }
private void modifyStreamReader(String streamKey) { private void modifyStreamReader(String streamKey, String executionResult) {
log.debug("start to modify the redis stream listener container stream key"); log.debug("start to modify the redis stream listener container stream key");
String oldStreamKey = redisStreamReaderConfig.getStreamKey(); String oldStreamKey = redisStreamReaderConfig.getStreamKey();
log.debug("change stream key from [{}] to [{}]", oldStreamKey, streamKey); log.debug("change stream key from [{}] to [{}]",
oldStreamKey,
streamKey);
log.debug("start to set the Redis Stream Reader key"); log.debug("start to set the Redis Stream Reader key");
redisStreamReaderConfig.setStreamKey(streamKey); redisStreamReaderConfig.setStreamKey(streamKey);
log.debug("start to set the Redis Stream Execution Result Container");
redisStreamReaderConfig.setExecutionResult(executionResult);
} }
private void destroyStreamReader(String streamKey) { private void destroyStreamReader(String streamKey) {
String oldStreamKey = redisStreamReaderConfig.getStreamKey(); String oldStreamKey = redisStreamReaderConfig.getStreamKey();
if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) { if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) {
StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey); StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey);
log.debug("destroyed old redis stream listener container is [ {} ]", streamMessageListenerContainer); log.debug("destroyed old redis stream listener container is [ {} ]",
streamMessageListenerContainer);
// double destroy // double destroy

View File

@@ -2,6 +2,8 @@ package io.wdd.rpc.execute.result;
import io.wdd.rpc.status.AgentStatusStreamReader; import io.wdd.rpc.status.AgentStatusStreamReader;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -18,6 +20,8 @@ import java.time.Duration;
@Configuration @Configuration
@Slf4j @Slf4j
@Getter
@Setter
public class RedisStreamReaderConfig { public class RedisStreamReaderConfig {
@Resource @Resource
@@ -29,13 +33,7 @@ public class RedisStreamReaderConfig {
private String streamKey = "cccc"; private String streamKey = "cccc";
public void setStreamKey(String streamKey) { private String executionResult = null;
this.streamKey = streamKey;
}
public String getStreamKey() {
return streamKey;
}
@Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER) @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
@Scope("prototype") @Scope("prototype")
@@ -49,6 +47,8 @@ public class RedisStreamReaderConfig {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// todo 此部分可以被移出到另外的位置会更加方便就不需要对此Bean进行创建和销毁了
listenerContainer.receive( listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
@@ -56,7 +56,9 @@ public class RedisStreamReaderConfig {
new CommandResultReader( new CommandResultReader(
"OctopusServer", "OctopusServer",
streamKey, streamKey,
"OctopusServer") "OctopusServer",
executionResult
)
); );

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.result.CreateStreamReader; import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender; import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -18,6 +19,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER; import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
@Service @Service
@Slf4j @Slf4j
@@ -48,36 +50,44 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public String SendCommandToAgent(String topicName, String type, List<String> commandList) { public String SendCommandToAgent(String topicName, String type, List<String> commandList) {
// 构造 Execution Command对应的消息体
OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList); OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList);
// 获取 ResultKey
ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent(); ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent();
String executionMsg; String executionMsg;
try { try {
executionMsg = objectMapper.writeValueAsString(executionMessage); executionMsg = objectMapper.writeValueAsString(executionMessage);
octopusMessage.setContent(executionMsg); octopusMessage.setContent(executionMsg);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
String resultKey = executionMessage.getResultKey(); String resultKey = executionMessage.getResultKey();
// send the message
messageSender.send(octopusMessage);
// set up the stream read group // set up the stream read group
String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey); String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey);
log.debug("set consumer group for the stream key with => [ {} ]", resultKey); log.debug("set consumer group for the stream key with => [ {} ]", resultKey);
// change the redis stream listener container // change the redis stream listener container
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// send the message // construct the persistent Bean
messageSender.send(octopusMessage); ExecutionLogBean executionLogBean = buildPersistentLogBeanFromOctopusMessage(octopusMessage);
// send resultKey to ExecutionResultDaemonHandler
WAIT_EXECUTION_RESULT_LIST.put(resultKey, executionLogBean);
return resultKey; return resultKey;
} }
private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) {
return null;
}
@Override @Override
public List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command) { public List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command) {
@@ -93,7 +103,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
ExecutionMessage executionMessage = generateExecutionMessage( ExecutionMessage executionMessage = generateExecutionMessage(
type, type,
commandList, commandList,
generateCommandResultKey(topicName) ExecutionMessage.GetResultKey(topicName)
); );
return OctopusMessage.builder() return OctopusMessage.builder()
@@ -114,11 +124,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
} }
private String generateCommandResultKey(String topicName) {
String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss"));
return topicName + "-" + TimeString;
}
} }

View File

@@ -0,0 +1,134 @@
package io.wdd.rpc.execute.service;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer;
import io.wdd.rpc.execute.result.CreateStreamReader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.*;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
/**
* 1. [waiting strategy ]
* 2. [build the redis stream listener]
* 3. [call persistence]
*/
@Service
@Slf4j
public class ExecutionResultDaemonHandler {
/**
* store all execution result key
* <p>
* which means there are execution running , waiting for their result to handle
*/
public static final ConcurrentHashMap<String, ExecutionLogBean> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
@Resource
CreateStreamReader createStreamReader;
@PostConstruct
public void startExecutionDaemonHandler() {
while (true) {
if (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
try {
// no execution result need to handle
// wait for 5 seconds
TimeUnit.SECONDS.sleep(5);
// restart the forever loop
continue;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// has result to handle , just handle one result at one time
String resultKey = WAIT_EXECUTION_RESULT_LIST.keys()
.nextElement();
CompletableFuture<String> executionResultFuture = CompletableFuture.supplyAsync(
() -> {
String executionResult = "";
// 构造 resultKey对应的 Redis Stream Listener Container
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
resultKey,
executionResult);
// 获得结果
String s = "no no no";
try {
s = CompletableFuture.supplyAsync(
() -> {
while (true) {
if (StringUtils.isNotEmpty(executionResult)) {
return executionResult;
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
)
.get(80,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return s;
}
);
CompletableFuture<String> falloutTimeFuture = CompletableFuture.supplyAsync(
() -> {
try {
TimeUnit.SECONDS.sleep(70);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[ Failed ] - execution has failed !";
}
);
// 获取结果然后销毁Stream Listener Container
CompletableFuture.anyOf(falloutTimeFuture,
executionResultFuture)
.whenCompleteAsync(
(resultString, e) -> {
log.info("execution result are => {}",
resultString);
// 持久化存储对应的结果
ExecutionResultStringDeserializer.format(String.valueOf(resultString));
}
);
}
}
}