[ server ] [ executor ] - redis stream listener container all procedure

This commit is contained in:
zeaslity
2022-12-30 14:40:21 +08:00
parent 0c2549eb34
commit d9a70aa059
4 changed files with 45 additions and 14 deletions

View File

@@ -81,7 +81,7 @@ public class CommandExecutor {
));
// cache log lines
logToArrayListCache.cacheLog(streamKey, process.getInputStream());
logToArrayListCache.cacheLog(streamKey, process);
// start to send the result log
streamSender.startToWaitLog(streamKey);

View File

@@ -27,6 +27,21 @@ public class LogToArrayListCache {
new ArrayList<>(256)
);
public void cacheLog(String streamKey, Process process) {
ArrayList<String> commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey);
String format = String.format("execution command are => [ %s ]", process.info().commandLine().get());
// add the command
commandCachedLog.add(format);
commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add("");
// cache the real command logs
cacheLog(streamKey, process.getInputStream());
}
public void cacheLog(String streamKey, InputStream commandLogStream) {
ArrayList<String> commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey);

View File

@@ -1,6 +1,8 @@
package io.wdd.rpc.execute.result;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
@@ -12,6 +14,7 @@ import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Getter
@Setter
@@ -48,24 +51,34 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
public void onMessage(MapRecord<String, String, String> message) {
String streamKey = message.getStream();
RecordId messageId = message.getId();
String key = (String) message.getValue().keySet().toArray()[0];
String value = message.getValue().get(key);
String value = (String) message.getValue().values().toArray()[0];
log.info("Octopus Agent [ {} ] execution of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId);
// print to console
printPrettyDeserializedCommandResult(value);
// log to somewhere
}
private void printPrettyDeserializedCommandResult(String valueString){
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
try {
System.out.println("streamKey = " + streamKey);
System.out.println("messageId = " + messageId);
System.out.println("key = " + key);
System.out.println("value = " + value);
String tmp = objectMapper.readValue(valueString, new TypeReference<String>() {
});
ArrayList<String>commandResultList = objectMapper.readValue(value, ArrayList.class);
commandResultList.stream().forEach(
List<String> stringList = objectMapper.readValue(tmp, new TypeReference<List<String>>() {
});
stringList.stream().forEach(
System.out::println
);
@@ -73,9 +86,6 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
throw new RuntimeException(e);
}
log.info("intend to be handled already !");
}

View File

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
@@ -29,6 +30,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource
RedisTemplate redisTemplate;
@Resource
CreateStreamReader createStreamReader;
@Override
public String SendCommandToAgent(String topicName, String command) {
return this.SendCommandToAgent(topicName, List.of(command));
@@ -58,11 +62,13 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
}
String resultKey = executionMessage.getResultKey();
// set up the stream read group
String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey);
System.out.println("group = " + group);
log.debug("set consumer group for the stream key with => [ {} ]", resultKey);
// change the redis stream listener container
createStreamReader.registerStreamReader(resultKey);
// send the message
messageSender.send(octopusMessage);