From d9a70aa0599c8ea19bdd7e2c974df7b6053f7615 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 30 Dec 2022 14:40:21 +0800 Subject: [PATCH] [ server ] [ executor ] - redis stream listener container all procedure --- .../wdd/agent/executor/CommandExecutor.java | 2 +- .../executor/thread/LogToArrayListCache.java | 15 ++++++++ .../execute/result/CommandResultReader.java | 34 ++++++++++++------- .../service/CoreExecutionServiceImpl.java | 8 ++++- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index cb3375a..72b6759 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -81,7 +81,7 @@ public class CommandExecutor { )); // cache log lines - logToArrayListCache.cacheLog(streamKey, process.getInputStream()); + logToArrayListCache.cacheLog(streamKey, process); // start to send the result log streamSender.startToWaitLog(streamKey); diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java index 1e4e9a0..ce340b0 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java @@ -27,6 +27,21 @@ public class LogToArrayListCache { new ArrayList<>(256) ); + public void cacheLog(String streamKey, Process process) { + + ArrayList commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); + + String format = String.format("execution command are => [ %s ]", process.info().commandLine().get()); + // add the command + commandCachedLog.add(format); + commandCachedLog.add("--------------- command result are as below --------------------"); + commandCachedLog.add(""); + + // cache the real command logs + cacheLog(streamKey, process.getInputStream()); + + } + public void cacheLog(String streamKey, InputStream commandLogStream) { ArrayList commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java index fc0e696..21e6d0e 100644 --- a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -1,6 +1,8 @@ package io.wdd.rpc.execute.result; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.Setter; @@ -12,6 +14,7 @@ import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; import java.util.ArrayList; +import java.util.List; @Getter @Setter @@ -48,24 +51,34 @@ public class CommandResultReader implements StreamListener message) { String streamKey = message.getStream(); - RecordId messageId = message.getId(); - String key = (String) message.getValue().keySet().toArray()[0]; + String value = message.getValue().get(key); - String value = (String) message.getValue().values().toArray()[0]; + log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId); + + // print to console + printPrettyDeserializedCommandResult(value); + + // log to somewhere + + } + + private void printPrettyDeserializedCommandResult(String valueString){ ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); + try { - System.out.println("streamKey = " + streamKey); - System.out.println("messageId = " + messageId); - System.out.println("key = " + key); - System.out.println("value = " + value); + String tmp = objectMapper.readValue(valueString, new TypeReference() { + }); - ArrayListcommandResultList = objectMapper.readValue(value, ArrayList.class); - commandResultList.stream().forEach( + List stringList = objectMapper.readValue(tmp, new TypeReference>() { + }); + + stringList.stream().forEach( System.out::println ); @@ -73,9 +86,6 @@ public class CommandResultReader implements StreamListener [ {} ]", resultKey); + // change the redis stream listener container + createStreamReader.registerStreamReader(resultKey); // send the message messageSender.send(octopusMessage);