diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index 8f02696..56a17c1 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -13,6 +13,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.connection.stream.StringRecord; import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; @@ -31,27 +32,41 @@ import java.util.concurrent.TimeUnit; @Slf4j public class StreamSender { + public static String TEST_STREAM_JAVA = "test-stream-java"; @Resource RedisTemplate redisTemplate; - @Resource LogToArrayListCache logToArrayListCache; + private final HashMap AllNeededStreamSender = new HashMap<>(16); + private final ArrayList cacheLogList = new ArrayList<>(256); + private static ByteBuffer currentTimeByteBuffer() { - private HashMap AllNeededStreamSender = new HashMap<>(16); + byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); + return ByteBuffer.wrap(timeBytes); + } - private ArrayList cacheLogList = new ArrayList<>(256); + private static String currentTimeString() { + + return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + + @SneakyThrows + private static Map generateFakeData() { + String random = RandomStringUtils.random(16); + CommandLog commandLog = new CommandLog(); + + Map map = BeanUtils.describe(commandLog); + + return map; + } public void startToWaitLog(String streamKey) throws InterruptedException { if (!AllNeededStreamSender.containsKey(streamKey)) { - StreamSenderEntity streamSender = StreamSenderEntity.builder() - .cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)) - .waitToSendLog(true) - .startIndex(0) - .streamKey(streamKey).build(); + StreamSenderEntity streamSender = StreamSenderEntity.builder().cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)).waitToSendLog(true).startIndex(0).streamKey(streamKey).build(); AllNeededStreamSender.put(streamKey, streamSender); @@ -65,7 +80,7 @@ public class StreamSender { } } - public void endWaitLog(String streamKey){ + public void endWaitLog(String streamKey) { StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); streamSenderEntity.setWaitToSendLog(false); @@ -95,52 +110,37 @@ public class StreamSender { startIndex = endIndex; } - public boolean send(String streamKey, String content){ + public boolean send(String streamKey, String content) { - return this.send(streamKey, List.of(content)); + HashMap map = new HashMap<>(16); + + map.put(currentTimeString(), content); + + return doSendLogToStream(streamKey, map); } private boolean send(String streamKey, List content) { - HashMap> map = new HashMap<>(16); + return this.send(streamKey, content.toString()); - map.put(currentTimeString(), content); - - return doSendLogToStream(streamKey, map); - } - - private static ByteBuffer currentTimeByteBuffer(){ - - byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); - - return ByteBuffer.wrap(timeBytes); - } - - private static String currentTimeString(){ - - return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } private boolean doSendLogToStream(String streamKey, HashMap map) { - log.info("redis stream sender message is {}", map); + log.info("redis stream sender message is {}", map); - MapRecord> stringRecord = StreamRecords.mapBacked(map).withStreamKey(streamKey); + StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey); - RecordId recordId = redisTemplate.opsForStream().add(stringRecord); + RecordId recordId = redisTemplate.opsForStream().add(stringRecord); // log.info("redis send recordId is {}",recordId); - return ObjectUtils.isNotEmpty(recordId); - + return ObjectUtils.isNotEmpty(recordId); } - - public static String TEST_STREAM_JAVA = "test-stream-java"; - @SneakyThrows - public void test(){ + public void test() { RecordId recordId = null; if (!redisTemplate.hasKey(TEST_STREAM_JAVA)) { @@ -163,16 +163,6 @@ public class StreamSender { } - @SneakyThrows - private static Map generateFakeData() { - String random = RandomStringUtils.random(16); - CommandLog commandLog = new CommandLog(); - - Map map = BeanUtils.describe(commandLog); - - return map; - } - public void clearLocalCache(String streamKey) { AllNeededStreamSender.remove(streamKey); } diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index fa2a63a..dfdac7a 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.Collections; import java.util.List; @RestController @@ -44,7 +45,7 @@ public class TestCommandExecutorController { ExecutionMessage executionMessage = ExecutionMessage.builder() .resultKey(streamKey) .type(messageType) - .command(messageType) + .commandList(Collections.singletonList(messageType)) .build(); System.out.println("executionMessage = " + executionMessage); diff --git a/agent/src/main/resources/application-k3s.yaml b/agent/src/main/resources/application-k3s.yaml deleted file mode 100644 index 504f170..0000000 --- a/agent/src/main/resources/application-k3s.yaml +++ /dev/null @@ -1,7 +0,0 @@ -spring: - redis: - master - - - - diff --git a/agent/src/main/resources/nginx-loadbalancer.conf b/agent/src/main/resources/nginx-loadbalancer.conf new file mode 100644 index 0000000..6195103 --- /dev/null +++ b/agent/src/main/resources/nginx-loadbalancer.conf @@ -0,0 +1,18 @@ +upstream redis-cluster { + server 146.56.147.12:21370; + server 146.56.147.12:21371; + server 146.56.147.12:21372; + server 146.56.147.12:21373; + server 146.56.147.12:21374; + server 146.56.147.12:21375; +} + +server { + listen 26379; + proxy_pass redis-cluster; +} + +server { + listen 6379; + proxy_pass 10.74.68.146:6379; +} diff --git a/agent/src/test/java/io/wdd/agent/AgentApplicationTests.java b/agent/src/test/java/io/wdd/agent/AgentApplicationTests.java index f697c2d..74842bb 100644 --- a/agent/src/test/java/io/wdd/agent/AgentApplicationTests.java +++ b/agent/src/test/java/io/wdd/agent/AgentApplicationTests.java @@ -19,7 +19,7 @@ import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; -@SpringBootTest +//@SpringBootTest @Slf4j class AgentApplicationTests { @@ -30,7 +30,7 @@ class AgentApplicationTests { CollectAllExecutorFunction collectAllExecutorFunction; - @Test +// @Test void testFileExecute(){ // ExecutionMessage executionMessage = ExecutionMessage.builder().type("TestFunction").resultKey("simpleFor-test").contend("123456").build(); @@ -42,7 +42,7 @@ class AgentApplicationTests { } - @Test +// @Test void contextLoads() { // https://zhuanlan.zhihu.com/p/449416472 diff --git a/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java b/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java index 09c6894..052ace4 100644 --- a/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java +++ b/common/src/main/java/io/wdd/common/handler/GlobalExceptionHandler.java @@ -145,8 +145,7 @@ public class GlobalExceptionHandler { } if (StringUtils.isNotEmpty(invalidMap.get(error.getField()))) { - invalidMap.put(error.getField(), - invalidMap.get(error.getField()) + "," + finalMessage); + invalidMap.put(error.getField(), invalidMap.get(error.getField()) + "," + finalMessage); } else { invalidMap.put(error.getField(), finalMessage); } diff --git a/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java b/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java new file mode 100644 index 0000000..9d550d8 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/config/RedisStreamReaderConfig.java @@ -0,0 +1,52 @@ +package io.wdd.rpc.execute.config; + + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.Subscription; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; + +import javax.annotation.Resource; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Duration; + +@Configuration +public class RedisStreamReaderConfig { + + public String streamKey; + + @Resource + private StreamListener> streamListener; + + + @Bean + public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { + + streamKey = "manual-command"; + + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build(); + + StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); + + + org.springframework.data.redis.stream.Subscription subscription = listenerContainer.receive( + + Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), + + StreamOffset.create(streamKey, ReadOffset.lastConsumed()), + + streamListener); + + listenerContainer.start(); + + return subscription; + } + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java new file mode 100644 index 0000000..b043a26 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/result/CommandResultReader.java @@ -0,0 +1,40 @@ +package io.wdd.rpc.execute.result; + +import io.wdd.rpc.execute.config.RedisStreamReaderConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +@Service +@Slf4j +public class CommandResultReader implements StreamListener> { + + @Resource + RedisStreamReaderConfig redisStreamReaderConfig; + + + @Override + public void onMessage(MapRecord message) { + + String commandLog = message.getValue().values().iterator().next(); + + System.out.println("commandLog = " + commandLog); + + log.info("intend to be handled already !"); + + } + + + public void readFromStreamKey(String streamKey) { + + String formerKey = redisStreamReaderConfig.streamKey; + log.info("start to change StreamReader streamKey from {} to ==> {}",formerKey, streamKey); + + redisStreamReaderConfig.streamKey = streamKey; + } + +} diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java index f0b58b6..4ff4e16 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionService.java @@ -1,23 +1,20 @@ package io.wdd.rpc.execute.service; -import org.springframework.stereotype.Service; - import java.util.List; public interface CoreExecutionService { - void SendCommandToAgent(String topicName, String command); + String SendCommandToAgent(String topicName, String command); - void SendCommandToAgent(String topicName, List commandList); + String SendCommandToAgent(String topicName, List commandList); - void SendCommandToAgent(String topicName, String type, List command); + String SendCommandToAgent(String topicName, String type, List command); - void SendCommandToAgent(List topicNameList, String type, String command); - void SendCommandToAgent(List topicNameList, String type, List command); + List SendCommandToAgent(List topicNameList, String type, List command); } diff --git a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java index 3bbca9f..df49c6c 100644 --- a/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java +++ b/server/src/main/java/io/wdd/rpc/execute/service/CoreExecutionServiceImpl.java @@ -10,6 +10,7 @@ import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.stream.Collectors; @Service public class CoreExecutionServiceImpl implements CoreExecutionService { @@ -19,29 +20,35 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { @Override - public void SendCommandToAgent(String topicName, String command) { - this.SendCommandToAgent(topicName, List.of(command)); + public String SendCommandToAgent(String topicName, String command) { + return this.SendCommandToAgent(topicName, List.of(command)); } @Override - public void SendCommandToAgent(String topicName, List commandList) { - this.SendCommandToAgent(topicName,"manual-command", commandList); - + public String SendCommandToAgent(String topicName, List commandList) { + return this.SendCommandToAgent(topicName,"manual-command", commandList); } @Override - public void SendCommandToAgent(String topicName, String type, List commandList) { + public String SendCommandToAgent(String topicName, String type, List commandList) { + OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList); + + messageSender.send(octopusMessage); + + ExecutionMessage content = (ExecutionMessage) octopusMessage.getContent(); + + return content.getResultKey(); } - @Override - public void SendCommandToAgent(List topicNameList, String type, String command) { - - } @Override - public void SendCommandToAgent(List topicNameList, String type, List command) { - + public List SendCommandToAgent(List topicNameList, String type, List command) { + return topicNameList.stream().map( + topicName -> { + return this.SendCommandToAgent(topicName, type, command); + } + ).collect(Collectors.toList()); } private OctopusMessage generateOctopusMessage(String topicName, String type, List commandList){ @@ -73,7 +80,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService { String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - return topicName + TimeString; } diff --git a/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java new file mode 100644 index 0000000..1b8bf75 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/execute/web/ExecutionController.java @@ -0,0 +1,44 @@ +package io.wdd.rpc.execute.web; + +import io.wdd.common.beans.response.R; +import io.wdd.rpc.execute.service.CoreExecutionService; +import org.apache.commons.lang3.StringUtils; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Nullable; +import javax.annotation.Resource; +import java.util.List; + +@RestController +@RequestMapping("octopus/server/executor") +public class ExecutionController { + + @Resource + CoreExecutionService coreExecutionService; + + + @PostMapping("command") + public R patchCommandToAgent( + @RequestParam(value = "topicName") String topicName, + @RequestParam(value = "commandList", required = false) @Nullable List commandList, + @RequestParam(value = "type", required = false) @Nullable String type + ) { + + String streamKey = ""; + + if (StringUtils.isEmpty(type)) { + streamKey = coreExecutionService.SendCommandToAgent(topicName, commandList); + } else { + streamKey = coreExecutionService.SendCommandToAgent(topicName, type,commandList); + } + + return R.ok(streamKey); + } + + + + +} diff --git a/server/src/main/resources/bootstrap.yml b/server/src/main/resources/bootstrap.yml index ef43407..605cba7 100644 --- a/server/src/main/resources/bootstrap.yml +++ b/server/src/main/resources/bootstrap.yml @@ -2,11 +2,11 @@ spring: application: name: octopus-server profiles: - active: local + active: k3s cloud: nacos: config: - group: local + group: k3s config-retry-time: 3000 file-extension: yaml max-retry: 3 @@ -16,5 +16,5 @@ spring: timeout: 5000 config-long-poll-timeout: 5000 extension-configs: - - group: local - data-id: common-local.yaml \ No newline at end of file + - group: k3s + data-id: common-k3s.yaml \ No newline at end of file