[ server ] [ execution ]- execution log persistence accomplish

This commit is contained in:
zeaslity
2023-01-13 18:10:45 +08:00
parent 8c8c445c74
commit 73b2bf0078
12 changed files with 307 additions and 63 deletions

View File

@@ -28,6 +28,8 @@ public class CommandReaderConfig {
private String streamKey;
private String recordId;
/**
* 执行的结果对象,保存在此处
*/

View File

@@ -8,6 +8,8 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LIS
@Configuration
public class CommandReaderConfigBean {
// todo must support for multi thread
// its not thread safe now
@Bean
public CommandReaderConfig commandReaderConfig() {
@@ -16,6 +18,7 @@ public class CommandReaderConfigBean {
.consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.streamKey("ccc")
.consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME)
.group("ccc")
.ExecutionResult(null)
.build();
}

View File

@@ -1,18 +1,16 @@
package io.wdd.rpc.execute.config;
import io.swagger.annotations.ApiModel;
import io.wdd.server.beans.po.ExecutionLogPO;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
@ApiModel("Execution模快持久化Bean对象")
public class ExecutionLogBean {
public class ExecutionLog extends ExecutionLogPO {
private String name;
}

View File

@@ -66,6 +66,7 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
// 赋值给外部的结果,是的执行的结果可以被拿到
this.commandReaderConfig.setExecutionResult(executionResultFormat);
this.commandReaderConfig.setRecordId(String.valueOf(messageId));
log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]",
streamKey,

View File

@@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j;
@@ -37,91 +37,113 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override
public String SendCommandToAgent(String topicName, String command) {
return this.SendCommandToAgent(topicName, List.of(command));
return this.SendCommandToAgent(topicName,
List.of(command));
}
@Override
public String SendCommandToAgent(String topicName, List<String> commandList) {
return this.SendCommandToAgent(topicName,"manual-command", commandList);
return this.SendCommandToAgent(topicName,
"manual-command",
commandList);
}
@Override
public String SendCommandToAgent(String topicName, String type, List<String> commandList) {
// 构造 Execution Command对应的消息体
OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList);
// 归一化type类型 不行
// 获取 ResultKey
ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent();
String executionMsg;
// 构造 Execution Command对应的消息体
ExecutionMessage executionMessage =
ExecutionMessage.builder()
.type(type)
.commandList(commandList)
.resultKey(ExecutionMessage.GetResultKey(topicName))
.build();
String executionMessageString;
try {
executionMsg = objectMapper.writeValueAsString(executionMessage);
octopusMessage.setContent(executionMsg);
executionMessageString = objectMapper.writeValueAsString(executionMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
OctopusMessage octopusMessage = OctopusMessage
.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessageString)
.uuid(topicName)
.build();
String resultKey = executionMessage.getResultKey();
// send the message
messageSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey);
log.info("set consumer group [{}] for the stream key with => [ {} ]", group, resultKey);
String group = redisTemplate
.opsForStream()
.createGroup(resultKey,
resultKey);
log.info("set consumer group [{}] for the stream key with => [ {} ]",
group,
resultKey);
// change the redis stream listener container
// createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
// construct the persistent Bean
ExecutionLogBean executionLogBean = buildPersistentLogBeanFromOctopusMessage(octopusMessage);
ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, executionMessage);
// send resultKey to ExecutionResultDaemonHandler
WAIT_EXECUTION_RESULT_LIST.put(resultKey, executionLogBean);
WAIT_EXECUTION_RESULT_LIST.put(resultKey,
executionLog);
// help gc
executionMessage = null;
octopusMessage =null;
return resultKey;
}
private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) {
return new ExecutionLogBean();
private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) {
ExecutionLog executionLog = new ExecutionLog();
executionLog.setAgentTopicName(octopusMessage.getUuid());
executionLog.setResultKey((String) octopusMessage.getContent());
executionLog.setCommandList(String.valueOf(executionMessage.getCommandList()));
executionLog.setType(executionMessage.getType());
executionLog.setResultKey(executionMessage.getResultKey());
return executionLog;
}
@Override
public List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command) {
return topicNameList.stream().map(
topicName -> {
return this.SendCommandToAgent(topicName, type, command);
}
).collect(Collectors.toList());
return topicNameList.stream()
.map(
topicName -> {
return this.SendCommandToAgent(topicName,
type,
command);
}
)
.collect(Collectors.toList());
}
private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList){
@Deprecated
private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList) {
ExecutionMessage executionMessage = generateExecutionMessage(
type,
commandList,
ExecutionMessage.GetResultKey(topicName)
);
return OctopusMessage.builder()
.type(OctopusMessageType.EXECUTOR)
.init_time(LocalDateTime.now())
.content(executionMessage)
.uuid(topicName)
.build();
return null;
}
@Deprecated
private ExecutionMessage generateExecutionMessage(String type, List<String> commandList, String resultKey) {
return ExecutionMessage.builder()
.type(type)
.commandList(commandList)
.resultKey(resultKey)
.build();
return null;
}
}

View File

@@ -1,9 +1,11 @@
package io.wdd.rpc.execute.service;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.execute.config.CommandReaderConfig;
import io.wdd.rpc.execute.config.ExecutionLogBean;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.server.service.ExecutionLogService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
@@ -11,6 +13,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
@@ -28,7 +31,7 @@ public class ExecutionResultDaemonHandler {
* <p>
* which means there are execution running , waiting for their result to handle
*/
public static final ConcurrentHashMap<String, ExecutionLogBean> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
public static final ConcurrentHashMap<String, ExecutionLog> WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32);
private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70;
@Resource
@@ -37,6 +40,9 @@ public class ExecutionResultDaemonHandler {
@Resource
CommandReaderConfig commandReaderConfig;
@Resource
ExecutionLogService executionLogService;
@PostConstruct
public void startExecutionDaemonHandler() {
@@ -133,9 +139,7 @@ public class ExecutionResultDaemonHandler {
throw new RuntimeException(e);
}
return new ArrayList<>(
List.of("[ Failed ] - execution has failed !")
);
return null;
}
);
@@ -144,22 +148,34 @@ public class ExecutionResultDaemonHandler {
.anyOf(falloutTimeFuture,
executionResultFuture);
complete.whenComplete(
(resultString, e) -> {
complete
.whenComplete(
(result, e) -> {
log.info("execution result are => {}",
resultString);
log.info("execution result are => {}",
result);
// 持久化存储对应的结果
// 持久化存储对应的结果
ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey);
executionLog.setAcTime(TimeUtils.currentTime());
executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult()));
executionLog.setResultCode(
CollectionUtils.isEmpty((Collection) result) ? 1 : 0
);
executionLog.setRecordId(commandReaderConfig.getRecordId());
executionLogService.save(executionLog);
// 清除此次任务的内容
WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
// 清除此次任务的内容
WAIT_EXECUTION_RESULT_LIST.remove(resultKey);
log.info("[Execution] - whole process are complete !");
}
);
log.info("[Execution] - whole process are complete !");
}
);
// very important
// stuck the main thread , otherwise it will create a dead loop , really bad
complete.join();
}