[ agent ] [ status ] - accomplish agent status - 4
This commit is contained in:
@@ -8,10 +8,9 @@ import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.REDIS_STREAM_LISTENER_CONTAINER;
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
|
||||
|
||||
|
||||
@Component
|
||||
@@ -25,7 +24,7 @@ public class CreateStreamReader implements ApplicationContextAware {
|
||||
private RedisStreamReaderConfig redisStreamReaderConfig;
|
||||
|
||||
|
||||
private HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
|
||||
private final HashMap<String, StreamMessageListenerContainer> REDIS_STREAM_LISTENER_CONTAINER_CACHE = new HashMap<>(16);
|
||||
|
||||
|
||||
@Override
|
||||
@@ -33,7 +32,7 @@ public class CreateStreamReader implements ApplicationContextAware {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
public void registerStreamReader(String streamKey) {
|
||||
public void registerStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
|
||||
|
||||
// prepare the environment
|
||||
prepareEnv();
|
||||
@@ -51,11 +50,11 @@ public class CreateStreamReader implements ApplicationContextAware {
|
||||
modifyStreamReader(streamKey);
|
||||
|
||||
// re-create the REDIS_STREAM_LISTENER_CONTAINER
|
||||
createStreamReader(streamKey);
|
||||
createStreamReader(redisStreamListenerContainerBeanName, streamKey);
|
||||
|
||||
}
|
||||
|
||||
private void prepareEnv(){
|
||||
private void prepareEnv() {
|
||||
|
||||
getBeanFactory();
|
||||
|
||||
@@ -68,16 +67,16 @@ public class CreateStreamReader implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
|
||||
private void getBeanFactory(){
|
||||
private void getBeanFactory() {
|
||||
this.beanFactory = applicationContext.getAutowireCapableBeanFactory();
|
||||
}
|
||||
|
||||
private void createStreamReader(String streamKey) {
|
||||
private void createStreamReader(String redisStreamListenerContainerBeanName, String streamKey) {
|
||||
|
||||
log.debug("start to create the redis stream listener container");
|
||||
// create the lazy bean
|
||||
|
||||
StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(REDIS_STREAM_LISTENER_CONTAINER, StreamMessageListenerContainer.class);
|
||||
StreamMessageListenerContainer streamMessageListenerContainer = applicationContext.getBean(redisStreamListenerContainerBeanName, StreamMessageListenerContainer.class);
|
||||
|
||||
REDIS_STREAM_LISTENER_CONTAINER_CACHE.put(streamKey, streamMessageListenerContainer);
|
||||
|
||||
@@ -102,7 +101,6 @@ public class CreateStreamReader implements ApplicationContextAware {
|
||||
|
||||
private void destroyStreamReader(String streamKey) {
|
||||
|
||||
log.debug("start to destroy {}", REDIS_STREAM_LISTENER_CONTAINER);
|
||||
|
||||
String oldStreamKey = redisStreamReaderConfig.getStreamKey();
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package io.wdd.rpc.execute.result;
|
||||
|
||||
|
||||
import io.wdd.rpc.status.AgentStatusStreamReader;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -22,7 +23,9 @@ public class RedisStreamReaderConfig {
|
||||
@Resource
|
||||
private RedisConnectionFactory redisConnectionFactory;
|
||||
|
||||
public static final String REDIS_STREAM_LISTENER_CONTAINER = "redisStreamListenerContainer";
|
||||
public static final String COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER = "commandResultRedisStreamListenerContainer";
|
||||
|
||||
public static final String AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER = "agentStatusRedisStreamListenerContainer";
|
||||
|
||||
private String streamKey = "cccc";
|
||||
|
||||
@@ -34,10 +37,10 @@ public class RedisStreamReaderConfig {
|
||||
return streamKey;
|
||||
}
|
||||
|
||||
@Bean(value = REDIS_STREAM_LISTENER_CONTAINER)
|
||||
@Bean(value = COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER)
|
||||
@Scope("prototype")
|
||||
@Lazy
|
||||
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> redisStreamListenerContainer(){
|
||||
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> commandResultRedisStreamListenerContainer(){
|
||||
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
|
||||
.builder()
|
||||
@@ -60,6 +63,31 @@ public class RedisStreamReaderConfig {
|
||||
return listenerContainer;
|
||||
}
|
||||
|
||||
@Bean(value = AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER)
|
||||
@Scope("prototype")
|
||||
@Lazy
|
||||
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> agentStatusRedisStreamListenerContainer(){
|
||||
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
|
||||
.builder()
|
||||
.pollTimeout(Duration.ofSeconds(2))
|
||||
.build();
|
||||
|
||||
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
|
||||
|
||||
listenerContainer.receive(
|
||||
|
||||
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
|
||||
|
||||
new AgentStatusStreamReader(
|
||||
"OctopusServer",
|
||||
"OctopusServer",
|
||||
"OctopusServer")
|
||||
|
||||
);
|
||||
|
||||
return listenerContainer;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ import java.time.format.DateTimeFormatter;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
@@ -68,7 +70,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
|
||||
log.debug("set consumer group for the stream key with => [ {} ]", resultKey);
|
||||
|
||||
// change the redis stream listener container
|
||||
createStreamReader.registerStreamReader(resultKey);
|
||||
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER, resultKey);
|
||||
|
||||
// send the message
|
||||
messageSender.send(octopusMessage);
|
||||
|
||||
@@ -13,6 +13,9 @@ import javax.annotation.Nullable;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER;
|
||||
import static io.wdd.rpc.execute.result.RedisStreamReaderConfig.COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("octopus/server/executor")
|
||||
public class ExecutionController {
|
||||
@@ -47,7 +50,17 @@ public class ExecutionController {
|
||||
@RequestParam(value = "streamKey") String streamKey
|
||||
) {
|
||||
|
||||
createStreamReader.registerStreamReader(streamKey);
|
||||
createStreamReader.registerStreamReader(COMMAND_RESULT_REDIS_STREAM_LISTENER_CONTAINER ,streamKey);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@PostMapping("/agentStatusStream")
|
||||
public void getAgentStatus(
|
||||
@RequestParam(value = "streamKey") String streamKey
|
||||
) {
|
||||
|
||||
createStreamReader.registerStreamReader(AGENT_STATUS_REDIS_STREAM_LISTENER_CONTAINER ,streamKey);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
package io.wdd.rpc.status;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.common.beans.status.AgentStatus;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Slf4j
|
||||
public class AgentStatusStreamReader implements StreamListener<String, MapRecord<String,String, String >> {
|
||||
|
||||
// https://medium.com/nerd-for-tech/event-driven-architecture-with-redis-streams-using-spring-boot-a81a1c9a4cde
|
||||
|
||||
//https://segmentfault.com/a/1190000040946712
|
||||
|
||||
//https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams.receive.containers
|
||||
|
||||
/**
|
||||
* 消费者类型:独立消费、消费组消费
|
||||
*/
|
||||
private String consumerType;
|
||||
/**
|
||||
* 消费组
|
||||
*/
|
||||
private String group;
|
||||
/**
|
||||
* 消费组中的某个消费者
|
||||
*/
|
||||
private String consumerName;
|
||||
|
||||
|
||||
public AgentStatusStreamReader(String consumerType, String group, String consumerName) {
|
||||
this.consumerType = consumerType;
|
||||
this.group = group;
|
||||
this.consumerName = consumerName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(MapRecord<String, String, String> message) {
|
||||
|
||||
String streamKey = message.getStream();
|
||||
RecordId messageId = message.getId();
|
||||
String key = (String) message.getValue().keySet().toArray()[0];
|
||||
String value = message.getValue().get(key);
|
||||
|
||||
|
||||
log.info("Octopus Agent [ {} ] status of [ {} ] Time is [ {} ] stream recordId is [{}]", streamKey, key, key, messageId);
|
||||
|
||||
// print to console
|
||||
printPrettyAgentStatus(value);
|
||||
|
||||
}
|
||||
|
||||
private void printPrettyAgentStatus(String valueString){
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
|
||||
|
||||
try {
|
||||
|
||||
String tmp = objectMapper.readValue(valueString, new TypeReference<String>() {
|
||||
});
|
||||
|
||||
AgentStatus agentStatus = objectMapper.readValue(tmp, new TypeReference<AgentStatus>() {
|
||||
});
|
||||
|
||||
System.out.println(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(agentStatus));
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import io.wdd.server.coreService.CoreServerService;
|
||||
import io.wdd.server.service.*;
|
||||
import io.wdd.server.utils.EntityUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
|
||||
@@ -20,6 +21,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -85,8 +87,22 @@ public class CoreServerServiceImpl implements CoreServerService {
|
||||
@Override
|
||||
public boolean serverCreateOrUpdate(ServerInfoVO serverInfoVO) {
|
||||
|
||||
ServerInfoPO serverInfoPO = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class);
|
||||
return serverInfoService.saveOrUpdate(serverInfoPO);
|
||||
ServerInfoPO po = new LambdaQueryChainWrapper<ServerInfoPO>(serverInfoService.getBaseMapper())
|
||||
.eq(ServerInfoPO::getServerName, serverInfoVO.getServerName()).one();
|
||||
|
||||
if (ObjectUtils.isNotEmpty(po)) {
|
||||
try {
|
||||
org.apache.commons.beanutils.BeanUtils.copyProperties(po, serverInfoVO);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
po = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class);
|
||||
}
|
||||
|
||||
return serverInfoService.saveOrUpdate(po);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user