[ server ] [ executor ] - redis stream listener container dynamic generate accomplish

This commit is contained in:
zeaslity
2022-12-30 13:50:00 +08:00
parent b4517b8875
commit 0c2549eb34
8 changed files with 219 additions and 74 deletions

View File

@@ -1,6 +1,8 @@
package io.wdd.agent.executor.redis; package io.wdd.agent.executor.redis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.config.beans.executor.CommandLog; import io.wdd.agent.config.beans.executor.CommandLog;
import io.wdd.agent.config.beans.executor.StreamSenderEntity; import io.wdd.agent.config.beans.executor.StreamSenderEntity;
import io.wdd.agent.executor.thread.LogToArrayListCache; import io.wdd.agent.executor.thread.LogToArrayListCache;
@@ -37,6 +39,9 @@ public class StreamSender {
@Resource @Resource
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
@Resource @Resource
LogToArrayListCache logToArrayListCache; LogToArrayListCache logToArrayListCache;
@@ -134,7 +139,16 @@ public class StreamSender {
private boolean send(String streamKey, List<String> content) { private boolean send(String streamKey, List<String> content) {
return this.send(streamKey, content.toString());
try {
String resultContent = objectMapper.writeValueAsString(content);
return this.send(streamKey, resultContent);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} }

View File

@@ -0,0 +1,29 @@
package io.wdd.rpc.execute.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
@Configuration
public class RedisConfiguration {
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
return redisTemplate;
}
}

View File

@@ -1,10 +1,17 @@
package io.wdd.rpc.execute.result; package io.wdd.rpc.execute.result;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@Getter @Getter
@Setter @Setter
@@ -30,6 +37,7 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
*/ */
private String consumerName; private String consumerName;
public CommandResultReader(String consumerType, String group, String consumerName) { public CommandResultReader(String consumerType, String group, String consumerName) {
this.consumerType = consumerType; this.consumerType = consumerType;
this.group = group; this.group = group;
@@ -39,9 +47,32 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
String commandLog = message.getValue().values().iterator().next(); String streamKey = message.getStream();
RecordId messageId = message.getId();
String key = (String) message.getValue().keySet().toArray()[0];
String value = (String) message.getValue().values().toArray()[0];
ObjectMapper objectMapper = new ObjectMapper();
try {
System.out.println("streamKey = " + streamKey);
System.out.println("messageId = " + messageId);
System.out.println("key = " + key);
System.out.println("value = " + value);
ArrayList<String>commandResultList = objectMapper.readValue(value, ArrayList.class);
commandResultList.stream().forEach(
System.out::println
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
System.out.println("commandLog = " + commandLog);
log.info("intend to be handled already !"); log.info("intend to be handled already !");

View File

@@ -2,12 +2,16 @@ package io.wdd.rpc.execute.result;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.HashMap;
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONTAINER;
@Component @Component
@@ -16,48 +20,100 @@ public class CreateStreamReader implements ApplicationContextAware {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private AutowireCapableBeanFactory beanFactory;
private RedisStreamReaderConfig redisStreamReaderConfig;
private HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
} }
public void registerStreamReader(String streamKey) { public void registerStreamReader(String streamKey) {
// prepare the environment
prepareEnv();
// destroy the REDIS_STREAM_LISTENER_CONTAINER
destroyStreamReader(streamKey);
Field declaredField = null; // modify the configuration ==> streamKey
try { modifyStreamReader(streamKey);
log.debug("start to create the redis stream listener container"); // re-create the REDIS_STREAM_LISTENER_CONTAINER
// create the lazy bean createStreamReader(streamKey);
RedisStreamReaderConfig.MyStreamMessageListenerContainer redisStreamListenerContainer = applicationContext.getBean("redisStreamListenerContainer", RedisStreamReaderConfig.MyStreamMessageListenerContainer.class);
declaredField = redisStreamListenerContainer.getClass().getDeclaredField("streamKey");
log.debug("Change Redis Stream Reader from [ {} ] to [ {} ]",declaredField.get(redisStreamListenerContainer), streamKey);
log.debug("start to set the Redis Stream Reader key");
declaredField.set(redisStreamListenerContainer, streamKey);
log.debug("current stream key is [ {} ]",declaredField.get(redisStreamListenerContainer));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
} }
private void prepareEnv(){
getBeanFactory();
getRedisStreamConfig();
}
private void getRedisStreamConfig() {
this.redisStreamReaderConfig = applicationContext.getBean("redisStreamReaderConfig", RedisStreamReaderConfig.class);
}
private void getBeanFactory(){
this.beanFactory = applicationContext.getAutowireCapableBeanFactory();
}
private void createStreamReader(String streamKey) { private void createStreamReader(String streamKey) {
log.debug("start to create the redis stream listener container");
// create the lazy bean
StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(REDIS_STREAM_LISTENER_CONTAINER, StreamMessageListenerContainer.class);
REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer);
// very important
log.debug("start the listener container");
streamMessageListenerContainer.start();
}
private void modifyStreamReader(String streamKey) {
log.debug("start to modify the redis stream listener container stream key");
String oldStreamKey = redisStreamReaderConfig.getStreamKey();
log.debug("change stream key from [{}] to [{}]", oldStreamKey, streamKey);
log.debug("start to set the Redis Stream Reader key");
redisStreamReaderConfig.setStreamKey(streamKey);
} }
private void destroyStreamReader(String streamKey) { private void destroyStreamReader(String streamKey) {
log.debug("start to destroy {}", REDIS_STREAM_LISTENER_CONTAINER);
String oldStreamKey = redisStreamReaderConfig.getStreamKey();
if (REDIS_STREAM_LISTENER_CONTAINER_CACHE.containsKey(oldStreamKey)) {
StreamMessageListenerContainer streamMessageListenerContainer = REDIS_STREAM_LISTENER_CONTAINER_CACHE.get(oldStreamKey);
log.debug("destroyed old redis stream listener container is [ {} ]", streamMessageListenerContainer);
// double destroy
beanFactory.destroyBean(streamMessageListenerContainer);
streamMessageListenerContainer.stop();
// help gc
streamMessageListenerContainer = null;
}
} }
} }

View File

@@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -16,61 +17,49 @@ import java.time.Duration;
@Configuration @Configuration
@Slf4j @Slf4j
@Lazy
public class RedisStreamReaderConfig { public class RedisStreamReaderConfig {
@Resource @Resource
private RedisConnectionFactory redisConnectionFactory; private RedisConnectionFactory redisConnectionFactory;
public static final String REDIS_STREAM_LISTENER_CONTAINER = "redisStreamListenerContainer";
@Bean(initMethod = "start", destroyMethod = "stop") private String streamKey = "cccc";
// @Scope(value = "prototype")
public void setStreamKey(String streamKey) {
this.streamKey = streamKey;
}
public String getStreamKey() {
return streamKey;
}
@Bean(value = REDIS_STREAM_LISTENER_CONTAINER)
@Scope("prototype")
@Lazy @Lazy
public MyStreamMessageListenerContainer redisStreamListenerContainer() { public StreamMessageListenerContainer<String, MapRecord<String, String, String>> redisStreamListenerContainer(){
return new MyStreamMessageListenerContainer(); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new CommandResultReader(
"OctopusServer",
streamKey,
"OctopusServer")
);
return listenerContainer;
} }
class MyStreamMessageListenerContainer {
public String streamKey = "cccc";
public void start() {
log.debug("Redis Stream Reader stream key is [ {} ]", this.streamKey);
}
public void stop() {
log.debug("Redis Stream Reader destroyed ! stream key is [ {} ]", this.streamKey);
}
public String get() {
return this.streamKey;
}
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> MyStreamMessageListenerContainer() {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new CommandResultReader(
"Octopus-Server",
"Octopus-Group",
"OctopusServer")
);
return listenerContainer;
}
}
} }

View File

@@ -6,6 +6,8 @@ import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.rpc.message.sender.ToAgentMessageSender; import io.wdd.rpc.message.sender.ToAgentMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -15,6 +17,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
@Slf4j
public class CoreExecutionServiceImpl implements CoreExecutionService { public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource @Resource
@@ -23,6 +26,9 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource @Resource
ObjectMapper objectMapper; ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
@Override @Override
public String SendCommandToAgent(String topicName, String command) { public String SendCommandToAgent(String topicName, String command) {
return this.SendCommandToAgent(topicName, List.of(command)); return this.SendCommandToAgent(topicName, List.of(command));
@@ -51,9 +57,17 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
String resultKey = executionMessage.getResultKey();
// set up the stream read group
String group = redisTemplate.opsForStream().createGroup(resultKey, resultKey);
System.out.println("group = " + group);
log.debug("set consumer group for the stream key with => [ {} ]", resultKey);
// send the message
messageSender.send(octopusMessage); messageSender.send(octopusMessage);
return executionMessage.getResultKey(); return resultKey;
} }

View File

@@ -18,3 +18,9 @@ spring:
extension-configs: extension-configs:
- group: local - group: local
data-id: common-local.yaml data-id: common-local.yaml
debug: true
logging:
level:
io.wdd.server:
debug

View File

@@ -18,3 +18,9 @@ spring:
extension-configs: extension-configs:
- group: local - group: local
data-id: common-local.yaml data-id: common-local.yaml
debug: true
logging:
level:
io.wdd.server:
debug