diff --git a/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java b/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java index a83f90f..b146983 100644 --- a/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java @@ -2,7 +2,6 @@ package io.wdd.agent.executor.status; import io.wdd.agent.config.utils.AgentCommonThreadPool; -import io.wdd.agent.executor.CheckSingleAppStatusCallable; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java index 5cce874..28da813 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfig.java @@ -28,6 +28,8 @@ public class CommandReaderConfig { private String streamKey; + private String recordId; + /** * 执行的结果对象,保存在此处 */ diff --git a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java index 7b36c6a..f4da32e 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/CommandReaderConfigBean.java @@ -8,6 +8,8 @@ import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LIS @Configuration public class CommandReaderConfigBean { + // todo must support for multi thread + // its not thread safe now @Bean public CommandReaderConfig commandReaderConfig() { @@ -16,6 +18,7 @@ public class CommandReaderConfigBean { .consumerName(REDIS_STREAM_LISTENER_CONSUMER_NAME) .streamKey("ccc") .consumerType(REDIS_STREAM_LISTENER_CONSUMER_NAME) + .group("ccc") .ExecutionResult(null) .build(); } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java similarity index 53% rename from server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java rename to server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java index 2a88515..ccbfad4 100644 --- a/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLogBean.java +++ b/server/src/main/java/io/wdd/rpc/execute/config/ExecutionLog.java @@ -1,18 +1,16 @@ package io.wdd.rpc.execute.config; import io.swagger.annotations.ApiModel; +import io.wdd.server.beans.po.ExecutionLogPO; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; -import lombok.experimental.SuperBuilder; + +import java.time.LocalDateTime; @Data @AllArgsConstructor -@NoArgsConstructor -@SuperBuilder(toBuilder = true) @ApiModel("Execution模快持久化Bean对象") -public class ExecutionLogBean { +public class ExecutionLog extends ExecutionLogPO { - private String name; } diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index e6a9569..56cc27c 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -66,6 +66,7 @@ public class CommandResultReader implements StreamListener commandList) { - return this.SendCommandToAgent(topicName,"manual-command", commandList); + return this.SendCommandToAgent(topicName, + "manual-command", + commandList); } @Override public String SendCommandToAgent(String topicName, String type, List commandList) { - // 构造 Execution Command对应的消息体 - OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList); + // 归一化type类型 不行 - // 获取 ResultKey - ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent(); - String executionMsg; + // 构造 Execution Command对应的消息体 + ExecutionMessage executionMessage = + ExecutionMessage.builder() + .type(type) + .commandList(commandList) + .resultKey(ExecutionMessage.GetResultKey(topicName)) + .build(); + + + String executionMessageString; try { - executionMsg = objectMapper.writeValueAsString(executionMessage); - octopusMessage.setContent(executionMsg); + executionMessageString = objectMapper.writeValueAsString(executionMessage); } catch (JsonProcessingException e) { throw new RuntimeException(e); } + + OctopusMessage octopusMessage = OctopusMessage + .builder() + .type(OctopusMessageType.EXECUTOR) + .init_time(LocalDateTime.now()) + .content(executionMessageString) + .uuid(topicName) + .build(); + String resultKey = executionMessage.getResultKey(); // send the message messageSender.send(octopusMessage); // set up the stream read group - String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey); - log.info("set consumer group [{}] for the stream key with => [ {} ]", group, resultKey); + String group = redisTemplate + .opsForStream() + .createGroup(resultKey, + resultKey); + log.info("set consumer group [{}] for the stream key with => [ {} ]", + group, + resultKey); // change the redis stream listener container // createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey); // construct the persistent Bean - ExecutionLogBean executionLogBean = buildPersistentLogBeanFromOctopusMessage(octopusMessage); + ExecutionLog executionLog = buildPersistentLogBeanFromOctopusMessage(octopusMessage, executionMessage); // send resultKey to ExecutionResultDaemonHandler - WAIT_EXECUTION_RESULT_LIST.put(resultKey, executionLogBean); + WAIT_EXECUTION_RESULT_LIST.put(resultKey, + executionLog); + + // help gc + executionMessage = null; + octopusMessage =null; return resultKey; } - private ExecutionLogBean buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage) { - - return new ExecutionLogBean(); + private ExecutionLog buildPersistentLogBeanFromOctopusMessage(OctopusMessage octopusMessage, ExecutionMessage executionMessage) { + ExecutionLog executionLog = new ExecutionLog(); + executionLog.setAgentTopicName(octopusMessage.getUuid()); + executionLog.setResultKey((String) octopusMessage.getContent()); + executionLog.setCommandList(String.valueOf(executionMessage.getCommandList())); + executionLog.setType(executionMessage.getType()); + executionLog.setResultKey(executionMessage.getResultKey()); + return executionLog; } @Override public List SendCommandToAgent(List topicNameList, String type, List command) { - return topicNameList.stream().map( - topicName -> { - return this.SendCommandToAgent(topicName, type, command); - } - ).collect(Collectors.toList()); + return topicNameList.stream() + .map( + topicName -> { + return this.SendCommandToAgent(topicName, + type, + command); + } + ) + .collect(Collectors.toList()); } - private OctopusMessage generateOctopusMessage(String topicName, String type, List commandList){ + @Deprecated + private OctopusMessage generateOctopusMessage(String topicName, String type, List commandList) { - ExecutionMessage executionMessage = generateExecutionMessage( - type, - commandList, - ExecutionMessage.GetResultKey(topicName) - ); - - return OctopusMessage.builder() - .type(OctopusMessageType.EXECUTOR) - .init_time(LocalDateTime.now()) - .content(executionMessage) - .uuid(topicName) - .build(); + return null; } + @Deprecated private ExecutionMessage generateExecutionMessage(String type, List commandList, String resultKey) { - return ExecutionMessage.builder() - .type(type) - .commandList(commandList) - .resultKey(resultKey) - .build(); + return null; } - } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java index 59f4c43..a12eaf3 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/ExecutionResultDaemonHandler.java @@ -1,9 +1,11 @@ package io.wdd.rpc.execute.service; +import io.wdd.common.utils.TimeUtils; import io.wdd.rpc.execute.config.CommandReaderConfig; -import io.wdd.rpc.execute.config.ExecutionLogBean; +import io.wdd.rpc.execute.config.ExecutionLog; import io.wdd.rpc.execute.result.BuildStreamReader; +import io.wdd.server.service.ExecutionLogService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; @@ -11,6 +13,7 @@ import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.*; @@ -28,7 +31,7 @@ public class ExecutionResultDaemonHandler { *

* which means there are execution running , waiting for their result to handle */ - public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); + public static final ConcurrentHashMap WAIT_EXECUTION_RESULT_LIST = new ConcurrentHashMap<>(32); private final int MAX_TIMEOUT_WAITING_FOR_EXECUTION_RESULT = 70; @Resource @@ -37,6 +40,9 @@ public class ExecutionResultDaemonHandler { @Resource CommandReaderConfig commandReaderConfig; + @Resource + ExecutionLogService executionLogService; + @PostConstruct public void startExecutionDaemonHandler() { @@ -133,9 +139,7 @@ public class ExecutionResultDaemonHandler { throw new RuntimeException(e); } - return new ArrayList<>( - List.of("[ Failed ] - execution has failed !") - ); + return null; } ); @@ -144,22 +148,34 @@ public class ExecutionResultDaemonHandler { .anyOf(falloutTimeFuture, executionResultFuture); - complete.whenComplete( - (resultString, e) -> { + complete + .whenComplete( + (result, e) -> { - log.info("execution result are => {}", - resultString); + log.info("execution result are => {}", + result); - // 持久化存储对应的结果 + // 持久化存储对应的结果 + ExecutionLog executionLog = WAIT_EXECUTION_RESULT_LIST.get(resultKey); + executionLog.setAcTime(TimeUtils.currentTime()); + executionLog.setResultContent(String.valueOf(commandReaderConfig.getExecutionResult())); + executionLog.setResultCode( + CollectionUtils.isEmpty((Collection) result) ? 1 : 0 + ); + executionLog.setRecordId(commandReaderConfig.getRecordId()); + executionLogService.save(executionLog); - // 清除此次任务的内容 - WAIT_EXECUTION_RESULT_LIST.remove(resultKey); + // 清除此次任务的内容 + WAIT_EXECUTION_RESULT_LIST.remove(resultKey); - log.info("[Execution] - whole process are complete !"); - } - ); + log.info("[Execution] - whole process are complete !"); + } + ); + // very important + // stuck the main thread , otherwise it will create a dead loop , really bad + complete.join(); } diff --git a/server/src/main/java/io/wdd/server/beans/po/ExecutionLogPO.java b/server/src/main/java/io/wdd/server/beans/po/ExecutionLogPO.java new file mode 100644 index 0000000..804e59e --- /dev/null +++ b/server/src/main/java/io/wdd/server/beans/po/ExecutionLogPO.java @@ -0,0 +1,126 @@ +package io.wdd.server.beans.po; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Date; +import lombok.Data; + +/** + * + * @TableName execution_log + */ +@TableName(value ="execution_log") +@Data +public class ExecutionLogPO implements Serializable { + /** + * + */ + @TableId + private Long id; + + /** + * + */ + private String agentTopicName; + + /** + * + */ + private String resultKey; + + /** + * + */ + private LocalDateTime acTime; + + /** + * + */ + private Integer resultCode; + + /** + * 执行类型,命令行或者Function + */ + private String type; + + /** + * + */ + private String commandList; + + /** + * 命令执行结果 + */ + private String resultContent; + + /** + * redis stream key中任务结果对应的RecordId + */ + private String recordId; + + @TableField(exist = false) + private static final long serialVersionUID = 1L; + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that == null) { + return false; + } + if (getClass() != that.getClass()) { + return false; + } + ExecutionLogPO other = (ExecutionLogPO) that; + return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId())) + && (this.getAgentTopicName() == null ? other.getAgentTopicName() == null : this.getAgentTopicName().equals(other.getAgentTopicName())) + && (this.getResultKey() == null ? other.getResultKey() == null : this.getResultKey().equals(other.getResultKey())) + && (this.getAcTime() == null ? other.getAcTime() == null : this.getAcTime().equals(other.getAcTime())) + && (this.getResultCode() == null ? other.getResultCode() == null : this.getResultCode().equals(other.getResultCode())) + && (this.getType() == null ? other.getType() == null : this.getType().equals(other.getType())) + && (this.getCommandList() == null ? other.getCommandList() == null : this.getCommandList().equals(other.getCommandList())) + && (this.getResultContent() == null ? other.getResultContent() == null : this.getResultContent().equals(other.getResultContent())) + && (this.getRecordId() == null ? other.getRecordId() == null : this.getRecordId().equals(other.getRecordId())); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((getId() == null) ? 0 : getId().hashCode()); + result = prime * result + ((getAgentTopicName() == null) ? 0 : getAgentTopicName().hashCode()); + result = prime * result + ((getResultKey() == null) ? 0 : getResultKey().hashCode()); + result = prime * result + ((getAcTime() == null) ? 0 : getAcTime().hashCode()); + result = prime * result + ((getResultCode() == null) ? 0 : getResultCode().hashCode()); + result = prime * result + ((getType() == null) ? 0 : getType().hashCode()); + result = prime * result + ((getCommandList() == null) ? 0 : getCommandList().hashCode()); + result = prime * result + ((getResultContent() == null) ? 0 : getResultContent().hashCode()); + result = prime * result + ((getRecordId() == null) ? 0 : getRecordId().hashCode()); + return result; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()); + sb.append(" ["); + sb.append("Hash = ").append(hashCode()); + sb.append(", id=").append(id); + sb.append(", agentTopicName=").append(agentTopicName); + sb.append(", resultKey=").append(resultKey); + sb.append(", acTime=").append(acTime); + sb.append(", resultCode=").append(resultCode); + sb.append(", type=").append(type); + sb.append(", commandList=").append(commandList); + sb.append(", resultContent=").append(resultContent); + sb.append(", recordId=").append(recordId); + sb.append(", serialVersionUID=").append(serialVersionUID); + sb.append("]"); + return sb.toString(); + } +} \ No newline at end of file diff --git a/server/src/main/java/io/wdd/server/mapper/ExecutionLogMapper.java b/server/src/main/java/io/wdd/server/mapper/ExecutionLogMapper.java new file mode 100644 index 0000000..7c596ec --- /dev/null +++ b/server/src/main/java/io/wdd/server/mapper/ExecutionLogMapper.java @@ -0,0 +1,18 @@ +package io.wdd.server.mapper; + +import io.wdd.server.beans.po.ExecutionLogPO; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** +* @author wdd +* @description 针对表【execution_log】的数据库操作Mapper +* @createDate 2023-01-13 17:58:33 +* @Entity io.wdd.server.beans.po.ExecutionLogPO +*/ +public interface ExecutionLogMapper extends BaseMapper { + +} + + + + diff --git a/server/src/main/java/io/wdd/server/service/ExecutionLogService.java b/server/src/main/java/io/wdd/server/service/ExecutionLogService.java new file mode 100644 index 0000000..594a729 --- /dev/null +++ b/server/src/main/java/io/wdd/server/service/ExecutionLogService.java @@ -0,0 +1,13 @@ +package io.wdd.server.service; + +import io.wdd.server.beans.po.ExecutionLogPO; +import com.baomidou.mybatisplus.extension.service.IService; + +/** +* @author wdd +* @description 针对表【execution_log】的数据库操作Service +* @createDate 2023-01-13 17:58:33 +*/ +public interface ExecutionLogService extends IService { + +} diff --git a/server/src/main/java/io/wdd/server/service/impl/ExecutionLogServiceImpl.java b/server/src/main/java/io/wdd/server/service/impl/ExecutionLogServiceImpl.java new file mode 100644 index 0000000..4c173a5 --- /dev/null +++ b/server/src/main/java/io/wdd/server/service/impl/ExecutionLogServiceImpl.java @@ -0,0 +1,22 @@ +package io.wdd.server.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import io.wdd.server.beans.po.ExecutionLogPO; +import io.wdd.server.service.ExecutionLogService; +import io.wdd.server.mapper.ExecutionLogMapper; +import org.springframework.stereotype.Service; + +/** +* @author wdd +* @description 针对表【execution_log】的数据库操作Service实现 +* @createDate 2023-01-13 17:58:33 +*/ +@Service +public class ExecutionLogServiceImpl extends ServiceImpl + implements ExecutionLogService{ + +} + + + + diff --git a/server/src/main/resources/mapper/ExecutionLogMapper.xml b/server/src/main/resources/mapper/ExecutionLogMapper.xml new file mode 100644 index 0000000..66a57ea --- /dev/null +++ b/server/src/main/resources/mapper/ExecutionLogMapper.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + id,agent_topic_name,result_key, + ac_time,result_code,type, + command_list,result_content,record_id + +