[ Execution ] change the excuetion log access way

This commit is contained in:
IceDerce
2023-06-16 10:55:24 +08:00
parent 1aa4c23dfc
commit 93692eb0af
5 changed files with 539 additions and 549 deletions

View File

@@ -4,7 +4,6 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.wdd.common.response.R;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.execute.service.AsyncExecutionService;
import io.wdd.rpc.execute.service.SyncExecutionService;
import org.springframework.web.bind.annotation.PostMapping;
@@ -18,7 +17,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.AgentStatusCacheService.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
@@ -30,8 +28,6 @@ public class ExecutionController {
@Resource
SyncExecutionService syncExecutionService;
@Resource
BuildStreamReader buildStreamReader;
@Resource
AsyncExecutionService asyncExecutionService;
@PostMapping("/command/one")
@@ -198,11 +194,6 @@ public class ExecutionController {
@RequestParam(value = "streamKey") @ApiParam(value = "status的Stream Key") String streamKey
) {
buildStreamReader.registerStreamReader(
AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER,
streamKey
);
return R.ok("请到控制台查看,已经切换至 => " + streamKey);
}

View File

@@ -1,27 +1,26 @@
package io.wdd.rpc.execute.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONSUMER_NAME;
@Configuration
public class CommandReaderConfigBean {
// todo must support for multi thread
// its not thread safe now
@Bean
public CommandReaderConfig commandReaderConfig() {
return CommandReaderConfig
.builder()
.consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.streamKey("ccc")
.consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.group("ccc")
.ExecutionResult(null)
.build();
}
}
//package io.wdd.rpc.execute.config;
//
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//
//
//@Configuration
//public class CommandReaderConfigBean {
//
// // todo must support for multi thread
// // its not thread safe now
// @Bean
// public CommandReaderConfig commandReaderConfig() {
//
// return CommandReaderConfig
// .builder()
// .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME)
// .streamKey("ccc")
// .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME)
// .group("ccc")
// .ExecutionResult(null)
// .build();
// }
//
//
//}

View File

@@ -1,189 +1,189 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.server.utils.SpringUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
@Component
@Slf4j
public class BuildStreamReader {
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
private RedisStreamReaderConfig redisStreamReaderConfig;
private StreamMessageListenerContainer streamMessageListenerContainer;
private CommandReaderConfig commandReaderConfig;
public void buildStreamReader(CommandReaderConfig commandReaderConfig) {
// prepare the environment
prepareExecutionEnv();
// just modify the redis listener container and it's ok
modifyExecutionStreamReader(commandReaderConfig);
}
@SneakyThrows
private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) {
// stop the old stream listener container
if (this.streamMessageListenerContainer.isRunning()) {
this.streamMessageListenerContainer.stop();
}
// modify container
this.streamMessageListenerContainer.receive(
StreamOffset.create(
commandReaderConfig.getStreamKey(),
ReadOffset.lastConsumed()),
new CommandResultReader(
commandReaderConfig
)
);
// very important
TimeUnit.MILLISECONDS.sleep(500);
this.streamMessageListenerContainer.start();
}
private void prepareExecutionEnv() {
getRedisStreamListenerContainer();
getRedisStreamReaderConfig();
}
private void getRedisStreamReaderConfig() {
this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig",
CommandReaderConfig.class);
}
private void getRedisStreamListenerContainer() {
this.streamMessageListenerContainer = SpringUtils.getBean(
EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
StreamMessageListenerContainer.class
);
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
registerStreamReader(redisStreamListenerContainerBeanName,
streamKey,
null);
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList<String> ExecutionResult) {
// prepare the environment
prepareEnv();
// oldStreamKey equals streamKey don't need to do anything , just return
if (redisStreamReaderConfig.getStreamKey()
.equals(streamKey)) {
log.debug("redis listener container not change !");
return;
}
// destroy the old REDIS_STREAM_LISTENER_CONTAINER
destroyStreamReader(streamKey);
// modify the configuration ==> streamKey
modifyStreamReader(streamKey,
ExecutionResult);
// re-create the REDIS_STREAM_LISTENER_CONTAINER
createStreamReader(redisStreamListenerContainerBeanName,
streamKey);
}
private void prepareEnv() {
getRedisStreamConfig();
}
private void getRedisStreamConfig() {
this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig",
RedisStreamReaderConfig.class);
}
private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
log.debug("start to create the redis stream listener container");
// create the lazy bean
StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName,
StreamMessageListenerContainer.class);
REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey,
streamMessageListenerContainer);
// very important
log.debug("start the listener container");
streamMessageListenerContainer.start();
}
private void modifyStreamReader(String streamKey, ArrayList<String> executionResult) {
log.debug("start to modify the redis stream listener container stream key");
String oldStreamKey = redisStreamReaderConfig.getStreamKey();
log.debug("change stream key from [{}] to [{}]",
oldStreamKey,
streamKey);
log.debug("start to set the Redis Stream Reader key");
redisStreamReaderConfig.setStreamKey(streamKey);
log.debug("start to set the Redis Stream Execution Result Container");
redisStreamReaderConfig.setExecutionResult(executionResult);
}
private void destroyStreamReader(String streamKey) {
String oldStreamKey = redisStreamReaderConfig.getStreamKey();
if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) {
StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey);
log.debug("destroyed old redis stream listener container is [ {} ]",
streamMessageListenerContainer);
// double destroy
SpringUtils.destroyBean(streamMessageListenerContainer);
streamMessageListenerContainer.stop();
// help gc
streamMessageListenerContainer = null;
}
}
}
//package io.wdd.rpc.execute.result;
//
//import io.wdd.rpc.execute.config.CommandReaderConfig;
//import io.wdd.server.utils.SpringUtils;
//import lombok.SneakyThrows;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//import org.springframework.stereotype.Component;
//
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.concurrent.TimeUnit;
//
//import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
//
//
//@Component
//@Slf4j
//public class BuildStreamReader {
//
// private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
// private RedisStreamReaderConfig redisStreamReaderConfig;
//
// private StreamMessageListenerContainer streamMessageListenerContainer;
//
// private CommandReaderConfig commandReaderConfig;
//
// public void buildStreamReader(CommandReaderConfig commandReaderConfig) {
//
// // prepare the environment
// prepareExecutionEnv();
//
//
// // just modify the redis listener container and it's ok
// modifyExecutionStreamReader(commandReaderConfig);
//
// }
//
// @SneakyThrows
// private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) {
//
// // stop the old stream listener container
// if (this.streamMessageListenerContainer.isRunning()) {
// this.streamMessageListenerContainer.stop();
// }
//
// // modify container
// this.streamMessageListenerContainer.receive(
// StreamOffset.create(
// commandReaderConfig.getStreamKey(),
// ReadOffset.lastConsumed()),
//
// new CommandResultReader(
// commandReaderConfig
// )
// );
//
//
// // very important
// TimeUnit.MILLISECONDS.sleep(500);
// this.streamMessageListenerContainer.start();
// }
//
// private void prepareExecutionEnv() {
//
// getRedisStreamListenerContainer();
//
// getRedisStreamReaderConfig();
//
// }
//
// private void getRedisStreamReaderConfig() {
//
// this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig",
// CommandReaderConfig.class);
// }
//
// private void getRedisStreamListenerContainer() {
//
// this.streamMessageListenerContainer = SpringUtils.getBean(
// EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
// StreamMessageListenerContainer.class
// );
// }
//
// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
// registerStreamReader(redisStreamListenerContainerBeanName,
// streamKey,
// null);
// }
//
// public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList<String> ExecutionResult) {
//
// // prepare the environment
// prepareEnv();
//
// // oldStreamKey equals streamKey don't need to do anything , just return
// if (redisStreamReaderConfig.getStreamKey()
// .equals(streamKey)) {
// log.debug("redis listener container not change !");
// return;
// }
//
// // destroy the old REDIS_STREAM_LISTENER_CONTAINER
// destroyStreamReader(streamKey);
//
// // modify the configuration ==> streamKey
// modifyStreamReader(streamKey,
// ExecutionResult);
//
// // re-create the REDIS_STREAM_LISTENER_CONTAINER
// createStreamReader(redisStreamListenerContainerBeanName,
// streamKey);
//
// }
//
// private void prepareEnv() {
//
// getRedisStreamConfig();
//
// }
//
// private void getRedisStreamConfig() {
//
// this.redisStreamReaderConfig = SpringUtils.getBean("redisStreamReaderConfig",
// RedisStreamReaderConfig.class);
// }
//
//
// private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
//
// log.debug("start to create the redis stream listener container");
// // create the lazy bean
//
// StreamMessageListenerContainer streamMessageListenerContainer = SpringUtils.getBean(redisStreamListenerContainerBeanName,
// StreamMessageListenerContainer.class);
//
// REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey,
// streamMessageListenerContainer);
//
// // very important
// log.debug("start the listener container");
// streamMessageListenerContainer.start();
//
//
// }
//
// private void modifyStreamReader(String streamKey, ArrayList<String> executionResult) {
//
// log.debug("start to modify the redis stream listener container stream key");
// String oldStreamKey = redisStreamReaderConfig.getStreamKey();
//
// log.debug("change stream key from [{}] to [{}]",
// oldStreamKey,
// streamKey);
//
// log.debug("start to set the Redis Stream Reader key");
// redisStreamReaderConfig.setStreamKey(streamKey);
//
// log.debug("start to set the Redis Stream Execution Result Container");
// redisStreamReaderConfig.setExecutionResult(executionResult);
//
// }
//
//
// private void destroyStreamReader(String streamKey) {
//
// String oldStreamKey = redisStreamReaderConfig.getStreamKey();
//
// if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) {
//
// StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey);
//
// log.debug("destroyed old redis stream listener container is [ {} ]",
// streamMessageListenerContainer);
//
//
// // double destroy
// SpringUtils.destroyBean(streamMessageListenerContainer);
// streamMessageListenerContainer.stop();
// // help gc
// streamMessageListenerContainer = null;
// }
//
//
// }
//}

View File

@@ -1,121 +1,121 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
@Configuration
@Slf4j
@Getter
@Setter
public class RedisStreamReaderConfig {
@Resource
private RedisConnectionFactory redisConnectionFactory;
public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer";
public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer";
public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer";
public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer";
/**
* used in old model
*/
private String streamKey = "cccc";
/**
* no use
*/
private ArrayList<String> executionResult = null;
@Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
@Lazy
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> executionResultRedisStreamListenerContainer(){
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
return listenerContainer;
}
@Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
@Scope("prototype")
@Lazy
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> commandResultRedisStreamListenerContainer(){
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// todo 此部分可以被移出到另外的位置会更加方便就不需要对此Bean进行创建和销毁了
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new CommandResultReader(
REDIS_STREAM_LISTENER_CONSUMER_NAME,
streamKey,
REDIS_STREAM_LISTENER_CONSUMER_NAME,
executionResult
)
);
return listenerContainer;
}
@Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER)
@Scope("prototype")
@Lazy
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> agentStatusRedisStreamListenerContainer(){
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new AgentStatusStreamReader(
REDIS_STREAM_LISTENER_CONSUMER_NAME,
REDIS_STREAM_LISTENER_CONSUMER_NAME,
REDIS_STREAM_LISTENER_CONSUMER_NAME)
);
return listenerContainer;
}
}
//package io.wdd.rpc.execute.result;
//
//
//import io.wdd.rpc.scheduler.service.status.AgentStatusStreamReader;
//import lombok.Getter;
//import lombok.Setter;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.context.annotation.Lazy;
//import org.springframework.context.annotation.Scope;
//import org.springframework.data.redis.connection.RedisConnectionFactory;
//import org.springframework.data.redis.connection.stream.MapRecord;
//import org.springframework.data.redis.connection.stream.ReadOffset;
//import org.springframework.data.redis.connection.stream.StreamOffset;
//import org.springframework.data.redis.stream.StreamMessageListenerContainer;
//
//import javax.annotation.Resource;
//import java.time.Duration;
//import java.util.ArrayList;
//
//@Configuration
//@Slf4j
//@Getter
//@Setter
//public class RedisStreamReaderConfig {
//
// @Resource
// private RedisConnectionFactory redisConnectionFactory;
//
// public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer";
//
// public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer";
//
// public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer";
//
// public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer";
//
// /**
// * used in old model
// */
// private String streamKey = "cccc";
//
// /**
// * no use
// */
// private ArrayList<String> executionResult = null;
//
//
// @Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> executionResultRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// return listenerContainer;
// }
//
//
// @Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
// @Scope("prototype")
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> commandResultRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// // todo 此部分可以被移出到另外的位置会更加方便就不需要对此Bean进行创建和销毁了
// listenerContainer.receive(
//
// StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
//
// new CommandResultReader(
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// streamKey,
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// executionResult
// )
//
// );
//
// return listenerContainer;
// }
//
// @Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER)
// @Scope("prototype")
// @Lazy
// public StreamMessageListenerContainer<String, MapRecord<String, String, String>> agentStatusRedisStreamListenerContainer(){
//
// StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
// .builder()
// .pollTimeout(Duration.ofSeconds(2))
// .build();
//
// StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
//
// listenerContainer.receive(
//
// StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
//
// new AgentStatusStreamReader(
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// REDIS_STREAM_LISTENER_CONSUMER_NAME,
// REDIS_STREAM_LISTENER_CONSUMER_NAME)
//
// );
//
// return listenerContainer;
// }
//
//
//}

View File

@@ -1,203 +1,203 @@
package io.wdd.rpc.execute.service;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.server.service.ExecutionLogService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
/**
* 1. [waiting strategy ]
* 2. [build the redis stream listener]
* 3. [call persistence]
*/
//@Service
@Slf4j
@Lazy
@Deprecated
public class ExecutionResultDaemonHandler {
/**
* store all execution result key
* <p>
* which means there are execution running , waiting for their result to handle
*/
public static final ConcurrentHashMap<String, ExecutionLog> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
@Resource
BuildStreamReader buildStreamReader;
@Resource
CommandReaderConfig commandReaderConfig;
@Resource
ExecutionLogService executionLogService;
@PostConstruct
public void startExecutionDaemonHandler() {
// 启动一个异步线程,运行 Execution结果处理守护进程
CompletableFuture.runAsync(
() -> realStartExecutionDaemonHandler()
);
}
private void realStartExecutionDaemonHandler() {
while (true) {
while (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
try {
// no execution result need to handle
// wait for 5 seconds
log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// has result to handle , just handle one result at one time
String resultKey = WAIT_EXECUTION_RESULT_LIST
.keys()
.nextElement();
log.debug(
"current result key is [{}]",
resultKey
);
CompletableFuture<ArrayList<String>> executionResultFuture =
CompletableFuture
.supplyAsync(
() -> {
// 修改相应的参数
commandReaderConfig.setStreamKey(resultKey);
// listener container 实际上是根据这个绑定的
commandReaderConfig.setGroup(resultKey);
// 必须归零
commandReaderConfig.setExecutionResult(null);
// 构造 resultKey对应的 Redis Stream Listener Container
buildStreamReader
.buildStreamReader(commandReaderConfig);
// 获得结果
ArrayList<String> s = new ArrayList<>(
List.of("no no no")
);
try {
s = CompletableFuture
.supplyAsync(
() -> {
while (true) {
// todo 多条命令时,这里只能获取到一个结果
if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) {
return commandReaderConfig.getExecutionResult();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
)
// 获取相应的结果
.get(
MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return s;
}
);
CompletableFuture<ArrayList<String>> falloutTimeFuture = CompletableFuture.supplyAsync(
() -> {
try {
TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
}
);
// 获取结果然后销毁Stream Listener Container
CompletableFuture<Object> complete = CompletableFuture
.anyOf(
falloutTimeFuture,
executionResultFuture
);
complete
.whenComplete(
(result, e) -> {
log.debug(
"execution result are => {}",
result
);
// 持久化存储对应的结果
ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey);
executionLog.setAcTime(TimeUtils.currentTime());
executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult()));
executionLog.setResultCode(
CollectionUtils.isEmpty((Collection) result) ? 1 : 0
);
executionLog.setRecordId(commandReaderConfig.getRecordId());
// 保存操作
executionLogService.save(executionLog);
// 清除此次任务的内容
WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
log.info(
"[Execution] - command {} result are {} result code is {} ,whole process are complete !",
executionLog.getCommandList(),
executionLog.getResultContent(),
executionLog.getResultCode()
);
}
);
// very important
// stuck the main thread , otherwise it will create a dead loop
complete.join();
}
}
}
//package io.wdd.rpc.execute.service;
//
//
//import io.wdd.common.utils.TimeUtils;
//import io.wdd.rpc.execute.config.CommandReaderConfig;
//import io.wdd.rpc.execute.config.ExecutionLog;
//import io.wdd.rpc.execute.result.BuildStreamReader;
//import io.wdd.server.service.ExecutionLogService;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.commons.collections.CollectionUtils;
//import org.springframework.context.annotation.Lazy;
//
//import javax.annotation.PostConstruct;
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.Collection;
//import java.util.List;
//import java.util.concurrent.*;
//
///**
// * 1. [waiting strategy ]
// * 2. [build the redis stream listener]
// * 3. [call persistence]
// */
////@Service
//@Slf4j
//@Lazy
//@Deprecated
//public class ExecutionResultDaemonHandler {
//
// /**
// * store all execution result key
// * <p>
// * which means there are execution running , waiting for their result to handle
// */
// public static final ConcurrentHashMap<String, ExecutionLog> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
// private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
//
// @Resource
// BuildStreamReader buildStreamReader;
//
// @Resource
// CommandReaderConfig commandReaderConfig;
//
// @Resource
// ExecutionLogService executionLogService;
//
// @PostConstruct
// public void startExecutionDaemonHandler() {
//
// // 启动一个异步线程,运行 Execution结果处理守护进程
// CompletableFuture.runAsync(
// () -> realStartExecutionDaemonHandler()
// );
//
// }
//
// private void realStartExecutionDaemonHandler() {
//
// while (true) {
//
// while (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
// try {
// // no execution result need to handle
//
// // wait for 5 seconds
// log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !");
// TimeUnit.SECONDS.sleep(5);
//
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
// // has result to handle , just handle one result at one time
// String resultKey = WAIT_EXECUTION_RESULT_LIST
// .keys()
// .nextElement();
//
// log.debug(
// "current result key is [{}]",
// resultKey
// );
//
//
// CompletableFuture<ArrayList<String>> executionResultFuture =
// CompletableFuture
// .supplyAsync(
// () -> {
// // 修改相应的参数
// commandReaderConfig.setStreamKey(resultKey);
// // listener container 实际上是根据这个绑定的
// commandReaderConfig.setGroup(resultKey);
// // 必须归零
// commandReaderConfig.setExecutionResult(null);
//
// // 构造 resultKey对应的 Redis Stream Listener Container
// buildStreamReader
// .buildStreamReader(commandReaderConfig);
//
// // 获得结果
// ArrayList<String> s = new ArrayList<>(
// List.of("no no no")
// );
//
// try {
// s = CompletableFuture
// .supplyAsync(
// () -> {
// while (true) {
// // todo 多条命令时,这里只能获取到一个结果
// if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) {
// return commandReaderConfig.getExecutionResult();
// }
//
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// }
// )
// // 获取相应的结果
// .get(
// MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } catch (ExecutionException e) {
// throw new RuntimeException(e);
// } catch (TimeoutException e) {
// throw new RuntimeException(e);
// }
//
//
// return s;
// }
// );
//
// CompletableFuture<ArrayList<String>> falloutTimeFuture = CompletableFuture.supplyAsync(
// () -> {
// try {
// TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//
// return null;
// }
// );
//
// // 获取结果然后销毁Stream Listener Container
// CompletableFuture<Object> complete = CompletableFuture
// .anyOf(
// falloutTimeFuture,
// executionResultFuture
// );
//
// complete
// .whenComplete(
// (result, e) -> {
//
// log.debug(
// "execution result are => {}",
// result
// );
//
// // 持久化存储对应的结果
// ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey);
// executionLog.setAcTime(TimeUtils.currentTime());
// executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult()));
// executionLog.setResultCode(
// CollectionUtils.isEmpty((Collection) result) ? 1 : 0
// );
// executionLog.setRecordId(commandReaderConfig.getRecordId());
//
//
// // 保存操作
// executionLogService.save(executionLog);
//
// // 清除此次任务的内容
// WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
// log.info(
// "[Execution] - command {} result are {} result code is {} ,whole process are complete !",
// executionLog.getCommandList(),
// executionLog.getResultContent(),
// executionLog.getResultCode()
// );
// }
// );
//
// // very important
// // stuck the main thread , otherwise it will create a dead loop
// complete.join();
//
// }
//
// }
//
//
//}