[ server ] [ executor ] - redis stream listener container dynamic generate

This commit is contained in:
zeaslity
2022-12-29 10:30:01 +08:00
parent 3806420b2e
commit b4517b8875
6 changed files with 170 additions and 84 deletions

View File

@@ -1,55 +0,0 @@
package io.wdd.rpc.execute.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@Configuration
public class RedisStreamReaderConfig {
public String streamKey;
@Resource
private StreamListener<String, MapRecord<String, String, String>> streamListener;
@Bean
public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
streamKey = "streamKey_lbzb7";
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
org.springframework.data.redis.stream.Subscription subscription = listenerContainer.receive(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}

View File

@@ -1,15 +1,13 @@
package io.wdd.rpc.execute.result; package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.RedisStreamReaderConfig; import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource; @Getter
@Setter
@Service
@Slf4j @Slf4j
public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> { public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> {
@@ -19,9 +17,24 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers //https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
@Resource /**
RedisStreamReaderConfig redisStreamReaderConfig; * 消费者类型:独立消费、消费组消费
*/
private String consumerType;
/**
* 消费组
*/
private String group;
/**
* 消费组中的某个消费者
*/
private String consumerName;
public CommandResultReader(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
@Override @Override
public void onMessage(MapRecord<String, String, String> message) { public void onMessage(MapRecord<String, String, String> message) {
@@ -35,12 +48,4 @@ public class CommandResultReader implements StreamListener<String, MapRecord<Str
} }
public void readFromStreamKey(String streamKey) {
String formerKey = redisStreamReaderConfig.streamKey;
log.info("start to change StreamReader streamKey from {} to ==> {}",formerKey, streamKey);
redisStreamReaderConfig.streamKey = streamKey;
}
} }

View File

@@ -0,0 +1,63 @@
package io.wdd.rpc.execute.result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
@Component
@Slf4j
public class CreateStreamReader implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void registerStreamReader(String streamKey) {
Field declaredField = null;
try {
log.debug("start to create the redis stream listener container");
// create the lazy bean
RedisStreamReaderConfig.MyStreamMessageListenerContainer redisStreamListenerContainer = applicationContext.getBean("redisStreamListenerContainer", RedisStreamReaderConfig.MyStreamMessageListenerContainer.class);
declaredField = redisStreamListenerContainer.getClass().getDeclaredField("streamKey");
log.debug("Change Redis Stream Reader from [ {} ] to [ {} ]",declaredField.get(redisStreamListenerContainer), streamKey);
log.debug("start to set the Redis Stream Reader key");
declaredField.set(redisStreamListenerContainer, streamKey);
log.debug("current stream key is [ {} ]",declaredField.get(redisStreamListenerContainer));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private void createStreamReader(String streamKey) {
}
private void destroyStreamReader(String streamKey) {
}
}

View File

@@ -0,0 +1,76 @@
package io.wdd.rpc.execute.result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.Resource;
import java.time.Duration;
@Configuration
@Slf4j
@Lazy
public class RedisStreamReaderConfig {
@Resource
private RedisConnectionFactory redisConnectionFactory;
@Bean(initMethod = "start", destroyMethod = "stop")
// @Scope(value = "prototype")
@Lazy
public MyStreamMessageListenerContainer redisStreamListenerContainer() {
return new MyStreamMessageListenerContainer();
}
class MyStreamMessageListenerContainer {
public String streamKey = "cccc";
public void start() {
log.debug("Redis Stream Reader stream key is [ {} ]", this.streamKey);
}
public void stop() {
log.debug("Redis Stream Reader destroyed ! stream key is [ {} ]", this.streamKey);
}
public String get() {
return this.streamKey;
}
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> MyStreamMessageListenerContainer() {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
listenerContainer.receive(
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
new CommandResultReader(
"Octopus-Server",
"Octopus-Group",
"OctopusServer")
);
return listenerContainer;
}
}
}

View File

@@ -1,8 +1,7 @@
package io.wdd.rpc.execute.web; package io.wdd.rpc.execute.web;
import io.wdd.common.beans.response.R; import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.config.RedisStreamReaderConfig; import io.wdd.rpc.execute.result.CreateStreamReader;
import io.wdd.rpc.execute.result.CommandResultReader;
import io.wdd.rpc.execute.service.CoreExecutionService; import io.wdd.rpc.execute.service.CoreExecutionService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
@@ -22,7 +21,7 @@ public class ExecutionController {
CoreExecutionService coreExecutionService; CoreExecutionService coreExecutionService;
@Resource @Resource
CommandResultReader commandResultReader; CreateStreamReader createStreamReader;
@PostMapping("command") @PostMapping("command")
@@ -48,11 +47,9 @@ public class ExecutionController {
@RequestParam(value = "streamKey") String streamKey @RequestParam(value = "streamKey") String streamKey
) { ) {
commandResultReader.readFromStreamKey(streamKey); createStreamReader.registerStreamReader(streamKey);
} }
} }

View File

@@ -2,11 +2,11 @@ spring:
application: application:
name: octopus-server name: octopus-server
profiles: profiles:
active: k3s active: local
cloud: cloud:
nacos: nacos:
config: config:
group: k3s group: local
config-retry-time: 3000 config-retry-time: 3000
file-extension: yaml file-extension: yaml
max-retry: 3 max-retry: 3
@@ -16,5 +16,5 @@ spring:
timeout: 5000 timeout: 5000
config-long-poll-timeout: 5000 config-long-poll-timeout: 5000
extension-configs: extension-configs:
- group: k3s - group: local
data-id: common-k3s.yaml data-id: common-local.yaml