[ server ] [ executor ] - polish code -1

This commit is contained in:
IceDerce
2022-12-14 20:17:37 +08:00
parent d0bdca5ced
commit 3934bf17f8
12 changed files with 223 additions and 83 deletions

View File

@@ -13,6 +13,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -31,27 +32,41 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class StreamSender { public class StreamSender {
public static String TEST_STREAM_JAVA = "test-stream-java";
@Resource @Resource
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
@Resource @Resource
LogToArrayListCache logToArrayListCache; LogToArrayListCache logToArrayListCache;
private final HashMap<String, StreamSenderEntity> AllNeededStreamSender = new HashMap<>(16);
private final ArrayList<String> cacheLogList = new ArrayList<>(256);
private static ByteBuffer currentTimeByteBuffer() {
private HashMap<String, StreamSenderEntity> AllNeededStreamSender = new HashMap<>(16); byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8);
return ByteBuffer.wrap(timeBytes);
}
private ArrayList<String> cacheLogList = new ArrayList<>(256); private static String currentTimeString() {
return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
@SneakyThrows
private static Map generateFakeData() {
String random = RandomStringUtils.random(16);
CommandLog commandLog = new CommandLog();
Map<String, String> map = BeanUtils.describe(commandLog);
return map;
}
public void startToWaitLog(String streamKey) throws InterruptedException { public void startToWaitLog(String streamKey) throws InterruptedException {
if (!AllNeededStreamSender.containsKey(streamKey)) { if (!AllNeededStreamSender.containsKey(streamKey)) {
StreamSenderEntity streamSender = StreamSenderEntity.builder() StreamSenderEntity streamSender = StreamSenderEntity.builder().cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)).waitToSendLog(true).startIndex(0).streamKey(streamKey).build();
.cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey))
.waitToSendLog(true)
.startIndex(0)
.streamKey(streamKey).build();
AllNeededStreamSender.put(streamKey, streamSender); AllNeededStreamSender.put(streamKey, streamSender);
@@ -65,7 +80,7 @@ public class StreamSender {
} }
} }
public void endWaitLog(String streamKey){ public void endWaitLog(String streamKey) {
StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey);
streamSenderEntity.setWaitToSendLog(false); streamSenderEntity.setWaitToSendLog(false);
@@ -95,52 +110,37 @@ public class StreamSender {
startIndex = endIndex; startIndex = endIndex;
} }
public boolean send(String streamKey, String content){ public boolean send(String streamKey, String content) {
return this.send(streamKey, List.of(content)); HashMap<String, String> map = new HashMap<>(16);
map.put(currentTimeString(), content);
return doSendLogToStream(streamKey, map);
} }
private boolean send(String streamKey, List<String> content) { private boolean send(String streamKey, List<String> content) {
HashMap<String, List<String>> map = new HashMap<>(16); return this.send(streamKey, content.toString());
map.put(currentTimeString(), content);
return doSendLogToStream(streamKey, map);
}
private static ByteBuffer currentTimeByteBuffer(){
byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8);
return ByteBuffer.wrap(timeBytes);
}
private static String currentTimeString(){
return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
} }
private boolean doSendLogToStream(String streamKey, HashMap map) { private boolean doSendLogToStream(String streamKey, HashMap map) {
log.info("redis stream sender message is {}", map); log.info("redis stream sender message is {}", map);
MapRecord<String, String, List<String>> stringRecord = StreamRecords.mapBacked(map).withStreamKey(streamKey); StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey);
RecordId recordId = redisTemplate.opsForStream().add(stringRecord); RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
// log.info("redis send recordId is {}",recordId); // log.info("redis send recordId is {}",recordId);
return ObjectUtils.isNotEmpty(recordId); return ObjectUtils.isNotEmpty(recordId);
} }
public static String TEST_STREAM_JAVA = "test-stream-java";
@SneakyThrows @SneakyThrows
public void test(){ public void test() {
RecordId recordId = null; RecordId recordId = null;
if (!redisTemplate.hasKey(TEST_STREAM_JAVA)) { if (!redisTemplate.hasKey(TEST_STREAM_JAVA)) {
@@ -163,16 +163,6 @@ public class StreamSender {
} }
@SneakyThrows
private static Map generateFakeData() {
String random = RandomStringUtils.random(16);
CommandLog commandLog = new CommandLog();
Map<String, String> map = BeanUtils.describe(commandLog);
return map;
}
public void clearLocalCache(String streamKey) { public void clearLocalCache(String streamKey) {
AllNeededStreamSender.remove(streamKey); AllNeededStreamSender.remove(streamKey);
} }

View File

@@ -11,6 +11,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Collections;
import java.util.List; import java.util.List;
@RestController @RestController
@@ -44,7 +45,7 @@ public class TestCommandExecutorController {
ExecutionMessage executionMessage = ExecutionMessage.builder() ExecutionMessage executionMessage = ExecutionMessage.builder()
.resultKey(streamKey) .resultKey(streamKey)
.type(messageType) .type(messageType)
.command(messageType) .commandList(Collections.singletonList(messageType))
.build(); .build();
System.out.println("executionMessage = " + executionMessage); System.out.println("executionMessage = " + executionMessage);

View File

@@ -1,7 +0,0 @@
spring:
redis:
master

View File

@@ -0,0 +1,18 @@
upstream redis-cluster {
server 146.56.147.12:21370;
server 146.56.147.12:21371;
server 146.56.147.12:21372;
server 146.56.147.12:21373;
server 146.56.147.12:21374;
server 146.56.147.12:21375;
}
server {
listen 26379;
proxy_pass redis-cluster;
}
server {
listen 6379;
proxy_pass 10.74.68.146:6379;
}

View File

@@ -19,7 +19,7 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@SpringBootTest //@SpringBootTest
@Slf4j @Slf4j
class AgentApplicationTests { class AgentApplicationTests {
@@ -30,7 +30,7 @@ class AgentApplicationTests {
CollectAllExecutorFunction collectAllExecutorFunction; CollectAllExecutorFunction collectAllExecutorFunction;
@Test // @Test
void testFileExecute(){ void testFileExecute(){
// ExecutionMessage executionMessage = ExecutionMessage.builder().type("TestFunction").resultKey("simpleFor-test").contend("123456").build(); // ExecutionMessage executionMessage = ExecutionMessage.builder().type("TestFunction").resultKey("simpleFor-test").contend("123456").build();
@@ -42,7 +42,7 @@ class AgentApplicationTests {
} }
@Test // @Test
void contextLoads() { void contextLoads() {
// https://zhuanlan.zhihu.com/p/449416472 // https://zhuanlan.zhihu.com/p/449416472

View File

@@ -145,8 +145,7 @@ public class GlobalExceptionHandler {
} }
if (StringUtils.isNotEmpty(invalidMap.get(error.getField()))) { if (StringUtils.isNotEmpty(invalidMap.get(error.getField()))) {
invalidMap.put(error.getField(), invalidMap.put(error.getField(), invalidMap.get(error.getField()) + "," + finalMessage);
invalidMap.get(error.getField()) + "," + finalMessage);
} else { } else {
invalidMap.put(error.getField(), finalMessage); invalidMap.put(error.getField(), finalMessage);
} }

View File

@@ -0,0 +1,52 @@
package io.wdd.rpc.execute.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@Configuration
public class RedisStreamReaderConfig {
public String streamKey;
@Resource
private StreamListener<String, MapRecord<String, String, String>> streamListener;
@Bean
public org.springframework.data.redis.stream.Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
streamKey = "manual-command";
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
org.springframework.data.redis.stream.Subscription subscription = listenerContainer.receive(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}

View File

@@ -0,0 +1,40 @@
package io.wdd.rpc.execute.result;
import io.wdd.rpc.execute.config.RedisStreamReaderConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class CommandResultReader implements StreamListener<String, MapRecord<String,String, String >> {
@Resource
RedisStreamReaderConfig redisStreamReaderConfig;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String commandLog = message.getValue().values().iterator().next();
System.out.println("commandLog = " + commandLog);
log.info("intend to be handled already !");
}
public void readFromStreamKey(String streamKey) {
String formerKey = redisStreamReaderConfig.streamKey;
log.info("start to change StreamReader streamKey from {} to ==> {}",formerKey, streamKey);
redisStreamReaderConfig.streamKey = streamKey;
}
}

View File

@@ -1,23 +1,20 @@
package io.wdd.rpc.execute.service; package io.wdd.rpc.execute.service;
import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
public interface CoreExecutionService { public interface CoreExecutionService {
void SendCommandToAgent(String topicName, String command); String SendCommandToAgent(String topicName, String command);
void SendCommandToAgent(String topicName, List<String> commandList); String SendCommandToAgent(String topicName, List<String> commandList);
void SendCommandToAgent(String topicName, String type, List<String> command); String SendCommandToAgent(String topicName, String type, List<String> command);
void SendCommandToAgent(List<String> topicNameList, String type, String command);
void SendCommandToAgent(List<String> topicNameList, String type, List<String> command); List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command);
} }

View File

@@ -10,6 +10,7 @@ import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
@Service @Service
public class CoreExecutionServiceImpl implements CoreExecutionService { public class CoreExecutionServiceImpl implements CoreExecutionService {
@@ -19,29 +20,35 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
@Override @Override
public void SendCommandToAgent(String topicName, String command) { public String SendCommandToAgent(String topicName, String command) {
this.SendCommandToAgent(topicName, List.of(command)); return this.SendCommandToAgent(topicName, List.of(command));
} }
@Override @Override
public void SendCommandToAgent(String topicName, List<String> commandList) { public String SendCommandToAgent(String topicName, List<String> commandList) {
this.SendCommandToAgent(topicName,"manual-command", commandList); return this.SendCommandToAgent(topicName,"manual-command", commandList);
} }
@Override @Override
public void SendCommandToAgent(String topicName, String type, List<String> commandList) { public String SendCommandToAgent(String topicName, String type, List<String> commandList) {
OctopusMessage octopusMessage = this.generateOctopusMessage(topicName, type, commandList);
messageSender.send(octopusMessage);
ExecutionMessage content = (ExecutionMessage) octopusMessage.getContent();
return content.getResultKey();
} }
@Override
public void SendCommandToAgent(List<String> topicNameList, String type, String command) {
}
@Override @Override
public void SendCommandToAgent(List<String> topicNameList, String type, List<String> command) { public List<String> SendCommandToAgent(List<String> topicNameList, String type, List<String> command) {
return topicNameList.stream().map(
topicName -> {
return this.SendCommandToAgent(topicName, type, command);
}
).collect(Collectors.toList());
} }
private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList){ private OctopusMessage generateOctopusMessage(String topicName, String type, List<String> commandList){
@@ -73,7 +80,6 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String TimeString = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
return topicName + TimeString; return topicName + TimeString;
} }

View File

@@ -0,0 +1,44 @@
package io.wdd.rpc.execute.web;
import io.wdd.common.beans.response.R;
import io.wdd.rpc.execute.service.CoreExecutionService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Nullable;
import javax.annotation.Resource;
import java.util.List;
@RestController
@RequestMapping("octopus/server/executor")
public class ExecutionController {
@Resource
CoreExecutionService coreExecutionService;
@PostMapping("command")
public R<String> patchCommandToAgent(
@RequestParam(value = "topicName") String topicName,
@RequestParam(value = "commandList", required = false) @Nullable List<String> commandList,
@RequestParam(value = "type", required = false) @Nullable String type
) {
String streamKey = "";
if (StringUtils.isEmpty(type)) {
streamKey = coreExecutionService.SendCommandToAgent(topicName, commandList);
} else {
streamKey = coreExecutionService.SendCommandToAgent(topicName, type,commandList);
}
return R.ok(streamKey);
}
}

View File

@@ -2,11 +2,11 @@ spring:
application: application:
name: octopus-server name: octopus-server
profiles: profiles:
active: local active: k3s
cloud: cloud:
nacos: nacos:
config: config:
group: local group: k3s
config-retry-time: 3000 config-retry-time: 3000
file-extension: yaml file-extension: yaml
max-retry: 3 max-retry: 3
@@ -16,5 +16,5 @@ spring:
timeout: 5000 timeout: 5000
config-long-poll-timeout: 5000 config-long-poll-timeout: 5000
extension-configs: extension-configs:
- group: local - group: k3s
data-id: common-local.yaml data-id: common-k3s.yaml