[ server ] [ execution ]- optimize execution accomplish

This commit is contained in:
zeaslity
2023-01-13 17:11:04 +08:00
parent 4139e835e2
commit 8c8c445c74
11 changed files with 269 additions and 117 deletions

View File

@@ -22,7 +22,7 @@ public class ExecutionMessage {
public static String GetResultKey(String topicName) {
return topicName + "Execution:" + TimeUtils.currentTimeStringFullSplit();
return topicName + "-Execution:" + TimeUtils.currentTimeStringFullSplit();
}
}

View File

@@ -1,7 +1,7 @@
package io.wdd.rpc.controller;
import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.execute.service.CoreExecutionService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
@@ -24,7 +24,7 @@ public class ExecutionController {
CoreExecutionService coreExecutionService;
@Resource
CreateStreamReader createStreamReader;
BuildStreamReader buildStreamReader;
@PostMapping("command")
@@ -50,7 +50,7 @@ public class ExecutionController {
@RequestParam(value = "streamKey") String streamKey
) {
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER ,streamKey);
buildStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER , streamKey);
}
@@ -59,7 +59,7 @@ public class ExecutionController {
@RequestParam(value = "streamKey") String streamKey
) {
createStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER ,streamKey);
buildStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER , streamKey);
}

View File

@@ -26,9 +26,11 @@ public class CommandReaderConfig {
*/
private String consumerName;
private String streamKey;
/**
* 执行的结果对象,保存在此处
*/
private String ExecutionResult;
private ArrayList<String> ExecutionResult;
}

View File

@@ -0,0 +1,24 @@
package io.wdd.rpc.execute.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONSUMER_NAME;
@Configuration
public class CommandReaderConfigBean {
@Bean
public CommandReaderConfig commandReaderConfig() {
return CommandReaderConfig
.builder()
.consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.streamKey("ccc")
.consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.ExecutionResult(null)
.build();
}
}

View File

@@ -1,6 +1,5 @@
package io.wdd.rpc.execute.config;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -13,4 +12,7 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder(toBuilder = true)
@ApiModel("Execution模快持久化Bean对象")
public class ExecutionLogBean {
private String name;
}

View File

@@ -1,27 +1,90 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.server.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
@Component
@Slf4j
public class CreateStreamReader {
public class BuildStreamReader {
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
private RedisStreamReaderConfig redisStreamReaderConfig;
private StreamMessageListenerContainer streamMessageListenerContainer;
private CommandReaderConfig commandReaderConfig;
public void buildStreamReader(CommandReaderConfig commandReaderConfig) {
// prepare the environment
prepareExecutionEnv();
// just modify the redis listener container and it's ok
modifyExecutionStreamReader(commandReaderConfig);
}
private void modifyExecutionStreamReader(CommandReaderConfig commandReaderConfig) {
// stop the old stream listener container
this.streamMessageListenerContainer.stop();
// modify container
this.streamMessageListenerContainer.receive(
StreamOffset.create(
commandReaderConfig.getStreamKey(),
ReadOffset.lastConsumed()),
new CommandResultReader(
commandReaderConfig
)
);
// very important
this.streamMessageListenerContainer.start();
}
private void prepareExecutionEnv() {
getRedisStreamListenerContainer();
getRedisStreamReaderConfig();
}
private void getRedisStreamReaderConfig() {
this.commandReaderConfig = SpringUtils.getBean("commandReaderConfig",
CommandReaderConfig.class);
}
private void getRedisStreamListenerContainer() {
this.streamMessageListenerContainer = SpringUtils.getBean(
EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
StreamMessageListenerContainer.class
);
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
registerStreamReader(redisStreamListenerContainerBeanName,
streamKey,
null);
}
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, String ExecutionResult) {
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey, ArrayList<String> ExecutionResult) {
// prepare the environment
prepareEnv();
@@ -37,7 +100,8 @@ public class CreateStreamReader {
destroyStreamReader(streamKey);
// modify the configuration ==> streamKey
modifyStreamReader(streamKey, ExecutionResult);
modifyStreamReader(streamKey,
ExecutionResult);
// re-create the REDIS_STREAM_LISTENER_CONTAINER
createStreamReader(redisStreamListenerContainerBeanName,
@@ -76,7 +140,7 @@ public class CreateStreamReader {
}
private void modifyStreamReader(String streamKey, String executionResult) {
private void modifyStreamReader(String streamKey, ArrayList<String> executionResult) {
log.debug("start to modify the redis stream listener container stream key");
String oldStreamKey = redisStreamReaderConfig.getStreamKey();

View File

@@ -5,7 +5,6 @@ import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
@@ -33,9 +32,9 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
null);
}
public CommandResultReader(String consumerType, String group, String consumerName, String executionResult) {
this.commandReaderConfig = CommandReaderConfig.builder()
public CommandResultReader(String consumerType, String group, String consumerName, ArrayList<String> executionResult) {
this.commandReaderConfig = CommandReaderConfig
.builder()
.consumerName(consumerName)
.group(group)
.consumerType(consumerType)
@@ -62,32 +61,25 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
String value = message.getValue()
.get(key);
ArrayList<String> executionResultFormat = ExecutionResultStringDeserializer.format(value);
// 赋值给外部的结果,是的执行的结果可以被拿到
String executionResult = this.commandReaderConfig.getExecutionResult();
if (StringUtils.isNotEmpty(executionResult)) {
executionResult = value;
}
this.commandReaderConfig.setExecutionResult(executionResultFormat);
log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]",
streamKey,
key,
key,
messageId);
// print to console
printPrettyDeserializedCommandResult(value);
}
private void printPrettyDeserializedCommandResult(String valueString) {
ArrayList<String> executionResult = ExecutionResultStringDeserializer.format(valueString);
executionResult.stream()
executionResultFormat
.stream()
.forEach(
System.out::println
);
}

View File

@@ -1,6 +1,7 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.rpc.status.AgentStatusStreamReader;
import lombok.Getter;
import lombok.Setter;
@@ -17,6 +18,7 @@ import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
@Configuration
@Slf4j
@@ -29,11 +31,37 @@ public class RedisStreamReaderConfig {
public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer";
public static final String EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "executionResultRedisStreamListenerContainer";
public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer";
public static final String REDIS_STREAM_LISTENER_CONSUMER_NAME = "OctopusServer";
/**
* used in old model
*/
private String streamKey = "cccc";
private String executionResult = null;
/**
* no use
*/
private ArrayList<String> executionResult = null;
@Bean(value = EXECUTION_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
@Lazy
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> executionResultRedisStreamListenerContainer(){
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
return listenerContainer;
}
@Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
@Scope("prototype")
@@ -48,15 +76,14 @@ public class RedisStreamReaderConfig {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
// todo 此部分可以被移出到另外的位置会更加方便就不需要对此Bean进行创建和销毁了
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new CommandResultReader(
"OctopusServer",
REDIS_STREAM_LISTENER_CONSUMER_NAME,
streamKey,
"OctopusServer",
REDIS_STREAM_LISTENER_CONSUMER_NAME,
executionResult
)
@@ -82,9 +109,9 @@ public class RedisStreamReaderConfig {
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new AgentStatusStreamReader(
"OctopusServer",
"OctopusServer",
"OctopusServer")
REDIS_STREAM_LISTENER_CONSUMER_NAME,
REDIS_STREAM_LISTENER_CONSUMER_NAME,
REDIS_STREAM_LISTENER_CONSUMER_NAME)
);

View File

@@ -6,7 +6,7 @@ import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
@@ -14,11 +14,9 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.stream.Collectors;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
import static io.wdd.rpc.execute.service.ExecutionResultDaemonHandler.WAIT_EXECUTION_RESULT_LIST;
@Service
@@ -35,7 +33,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
RedisTemplate redisTemplate;
@Resource
CreateStreamReader createStreamReader;
BuildStreamReader buildStreamReader;
@Override
public String SendCommandToAgent(String topicName, String command) {
@@ -70,7 +68,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
// set up the stream read group
String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey);
log.debug("set consumer group for the stream key with => [ {} ]", resultKey);
log.info("set consumer group [{}] for the stream key with => [ {} ]", group, resultKey);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
@@ -85,7 +83,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) {
return null;
return new ExecutionLogBean();
}

View File

@@ -1,19 +1,19 @@
package io.wdd.rpc.execute.service;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.config.ExecutionResultStringDeserializer;
import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.execute.result.BuildStreamReader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
/**
* 1. [waiting strategy ]
* 2. [build the redis stream listener]
@@ -29,50 +29,77 @@ public class ExecutionResultDaemonHandler {
* which means there are execution running , waiting for their result to handle
*/
public static final ConcurrentHashMap<String, ExecutionLogBean> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
@Resource
CreateStreamReader createStreamReader;
BuildStreamReader buildStreamReader;
@Resource
CommandReaderConfig commandReaderConfig;
@PostConstruct
public void startExecutionDaemonHandler() {
// 启动一个异步线程,运行 Execution结果处理守护进程
CompletableFuture.runAsync(
() -> realStartExecutionDaemonHandler()
);
}
private void realStartExecutionDaemonHandler() {
while (true) {
if (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
while (WAIT_EXECUTION_RESULT_LIST.size() == 0) {
try {
// no execution result need to handle
// wait for 5 seconds
log.debug("realStartExecutionDaemonHandler start to sleep waiting for result !");
TimeUnit.SECONDS.sleep(5);
// restart the forever loop
continue;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// has result to handle , just handle one result at one time
String resultKey = WAIT_EXECUTION_RESULT_LIST.keys()
String resultKey = WAIT_EXECUTION_RESULT_LIST
.keys()
.nextElement();
log.info("current result key is [{}]",
resultKey);
CompletableFuture<String> executionResultFuture = CompletableFuture.supplyAsync(
CompletableFuture<ArrayList<String>> executionResultFuture =
CompletableFuture
.supplyAsync(
() -> {
String executionResult = "";
// 修改相应的参数
commandReaderConfig.setStreamKey(resultKey);
// listener container 实际上是根据这个绑定的
commandReaderConfig.setGroup(resultKey);
// 必须归零
commandReaderConfig.setExecutionResult(null);
// 构造 resultKey对应的 Redis Stream Listener Container
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER,
resultKey,
executionResult);
buildStreamReader
.buildStreamReader(commandReaderConfig);
// 获得结果
String s = "no no no";
ArrayList<String> s = new ArrayList<>(
List.of("no no no")
);
try {
s = CompletableFuture.supplyAsync(
s = CompletableFuture
.supplyAsync(
() -> {
while (true) {
if (StringUtils.isNotEmpty(executionResult)) {
return executionResult;
if (CollectionUtils.isNotEmpty(commandReaderConfig.getExecutionResult())) {
return commandReaderConfig.getExecutionResult();
}
try {
@@ -83,7 +110,7 @@ public class ExecutionResultDaemonHandler {
}
}
)
.get(80,
.get(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -93,41 +120,49 @@ public class ExecutionResultDaemonHandler {
throw new RuntimeException(e);
}
return s;
}
);
CompletableFuture<String> falloutTimeFuture = CompletableFuture.supplyAsync(
CompletableFuture<ArrayList<String>> falloutTimeFuture = CompletableFuture.supplyAsync(
() -> {
try {
TimeUnit.SECONDS.sleep(70);
TimeUnit.SECONDS.sleep(MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "[ Failed ] - execution has failed !";
return new ArrayList<>(
List.of("[ Failed ] - execution has failed !")
);
}
);
// 获取结果然后销毁Stream Listener Container
CompletableFuture.anyOf(falloutTimeFuture,
executionResultFuture)
.whenCompleteAsync(
CompletableFuture<Object> complete = CompletableFuture
.anyOf(falloutTimeFuture,
executionResultFuture);
complete.whenComplete(
(resultString, e) -> {
log.info("execution result are => {}",
resultString);
// 持久化存储对应的结果
ExecutionResultStringDeserializer.format(String.valueOf(resultString));
// 清除此次任务的内容
WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
log.info("[Execution] - whole process are complete !");
}
);
}
}
}

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.AgentStatus;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import io.wdd.server.beans.vo.ServerInfoVO;
@@ -115,7 +116,7 @@ public class AcceptAgentInitInfo {
}
// 4. generate the Octopus Agent Status Redis Stream Key & Consumer-Group
generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getServerName());
generateAgentStatusRedisStreamConsumerGroup(serverInfoVO.getTopicName());
// 5. send InitMessage to agent
sendInitMessageToAgent(serverInfoVO);
@@ -162,23 +163,30 @@ public class AcceptAgentInitInfo {
channel.basicAck(deliveryTag, false);
}
private void generateAgentStatusRedisStreamConsumerGroup(String serverName) {
private void generateAgentStatusRedisStreamConsumerGroup(String agentTopicName) {
String statusStreamKey = serverName + "-status";
String statusStreamKey = AgentStatus.getRedisStatusKey(agentTopicName);
if (!redisTemplate.hasKey(statusStreamKey)) {
log.debug(" not find the group, recreate");
// not find the group, recreate
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
}
// check for octopus-server consumer group
if (redisTemplate.opsForStream().groups(statusStreamKey)
/*if (redisTemplate.opsForStream().groups(statusStreamKey)
.stream()
.filter(
group -> group.groupName().startsWith("Octopus")
).collect(Collectors.toSet()).contains(Boolean.FALSE)) {
log.debug(" not find the group, recreate");
// not find the group, recreate
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
}
log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", serverName, statusStreamKey);
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
}*/
log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", agentTopicName, statusStreamKey);
}
private boolean checkAgentAlreadyRegister(String agentQueueTopic) {