From 9bc516bdbdd14971b8cf7e16d08841de2a4657f9 Mon Sep 17 00:00:00 2001 From: IceDerce Date: Tue, 27 Dec 2022 14:58:47 +0800 Subject: [PATCH] [agent]-[executor] optimize execution code --- agent/agent-application-run-on-linux.sh | 45 ++++++++++--------- agent/current-env.txt | 20 +++++++++ .../message/handler/OMHandlerExecutor.java | 35 +++++++++++---- .../config/message/handler/OMHandlerInit.java | 4 +- .../wdd/agent/executor/CommandExecutor.java | 5 ++- .../agent/executor/redis/StreamSender.java | 27 +++++++---- .../executor/thread/LogToArrayListCache.java | 12 +++-- .../bootup/CollectSystemInfo.java | 7 +-- .../bootup/OctopusAgentInitService.java | 9 +++- .../message/GenOctopusRabbitMQConnection.java | 3 +- .../io/wdd/agent/message/ToServerMessage.java | 4 +- agent/src/main/resources/bootstrap.yml | 8 ++-- .../io/wdd/source/shell/lib/wdd-lib-env.sh | 7 ++- .../src/main/java/io/wdd/source/shell/test.sh | 20 +++++---- 14 files changed, 139 insertions(+), 67 deletions(-) create mode 100644 agent/current-env.txt diff --git a/agent/agent-application-run-on-linux.sh b/agent/agent-application-run-on-linux.sh index 0267992..a79305d 100755 --- a/agent/agent-application-run-on-linux.sh +++ b/agent/agent-application-run-on-linux.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/zsh RED="31m" ## 姨妈红 GREEN="32m" ## 水鸭青 @@ -348,26 +348,29 @@ GenerateSystemInfo() { machineNumber=99 fi - export serverName="${city}-${hostArch}-${machineNumber}" - export serverIpPbV4="$public_ipv4" - export serverIpInV4="" - export serverIpPbV6="" - export serverIpInV6="" - export location="$city $region $country" - export provider="$org" - export managePort="$(netstat -ntulp | grep sshd | grep -w tcp | awk '{print$4}' | cut -d":" -f2)" - export cpuCore="$cores @ $freq MHz" - export cpuBrand="$cpuName" - export memoryTotal="$tram" - export diskTotal="$disk_total_size" - export diskUsage="$disk_used_size" - export archInfo="$arch ($lbit Bit)" - export osInfo="$opsy" - export osKernelInfo="$kern" - export tcpControl="$tcpctrl" - export virtualization="$virt" - export ioSpeed="$ioavg MB/s" - export machineId="$(cat /host/etc/machine-id)" + +cat >current-env.txt <() { + }); - } else { - // handle command - commandExecutor.execute(executionMessage); + System.out.println("executionMessage = " + executionMessage); + + String executionType = executionMessage.getType(); + + if (ALL_FUNCTION_MAP.containsKey(executionType)) { + // execute the exist function + functionExecutor.execute(executionMessage); + + } else { + // handle command + commandExecutor.execute(executionMessage); + } + + } catch (IOException e) { + throw new RuntimeException(e); } + return true; } } diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerInit.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerInit.java index 1895ecb..d65d1dc 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerInit.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerInit.java @@ -44,10 +44,10 @@ public class OMHandlerInit extends AbstractOctopusMessageHandler { // 2. send PassThroughTopicName successful info to the server - String success = String.format("Octopus Agent [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo, octopusMessage); + String success = String.format("[Octopus Agent] - [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo.getServerName(), octopusMessage.getUuid()); octopusMessage.setResult(success); - log.info(success); +// log.info(success); toServerMessage.send(octopusMessage); diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index 6dc7850..3ce6f9b 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -71,6 +71,7 @@ public class CommandExecutor { // cache log lines logToArrayListCache.cacheLog(streamKey, process.getInputStream()); + // start to send the result log streamSender.startToWaitLog(streamKey); // log.warn("start---------------------------------------------"); @@ -80,6 +81,8 @@ public class CommandExecutor { // a command shell don't understand how long it actually take processResult = process.waitFor(); + + // end send logs streamSender.endWaitLog(streamKey); log.info("current shell command {} result is {}", processBuilder.command(), processResult); @@ -119,7 +122,7 @@ public class CommandExecutor { TimeUnit.SECONDS.sleep(1); // clear the log Cache Thread scope - logToArrayListCache.getCommandCachedLog(streamKey).clear(); + logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey).clear(); // clear the stream sender streamSender.clearLocalCache(streamKey); diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index 56a17c1..4c102ce 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -33,11 +33,15 @@ import java.util.concurrent.TimeUnit; public class StreamSender { public static String TEST_STREAM_JAVA = "test-stream-java"; + @Resource RedisTemplate redisTemplate; + @Resource LogToArrayListCache logToArrayListCache; + private final HashMap AllNeededStreamSender = new HashMap<>(16); + private final ArrayList cacheLogList = new ArrayList<>(256); private static ByteBuffer currentTimeByteBuffer() { @@ -66,7 +70,13 @@ public class StreamSender { if (!AllNeededStreamSender.containsKey(streamKey)) { - StreamSenderEntity streamSender = StreamSenderEntity.builder().cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)).waitToSendLog(true).startIndex(0).streamKey(streamKey).build(); + StreamSenderEntity streamSender = StreamSenderEntity + .builder() + .cachedCommandLog(logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey)) + .waitToSendLog(true) + .startIndex(0) + .streamKey(streamKey) + .build(); AllNeededStreamSender.put(streamKey, streamSender); @@ -74,7 +84,7 @@ public class StreamSender { TimeUnit.SECONDS.sleep(2); if (AllNeededStreamSender.get(streamKey).isWaitToSendLog()) { - log.info("stream sender wait 1 s to send message"); + log.info("stream sender wait 2 s to send message"); AllNeededStreamSender.get(streamKey).setWaitToSendLog(false); batchSendLog(streamKey); } @@ -85,14 +95,14 @@ public class StreamSender { StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); streamSenderEntity.setWaitToSendLog(false); - batchSendLog(streamKey); + batchSendLog(streamKey); } public void batchSendLog(String streamKey) { StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); - log.info("batch send log == {}", streamSenderEntity); + //log.info("batch send log == {}", streamSenderEntity); ArrayList cachedCommandLog = streamSenderEntity.getCachedCommandLog(); @@ -103,11 +113,13 @@ public class StreamSender { List content = cachedCommandLog.subList(startIndex, endIndex); -// System.out.println("content = " + content); + // only send when cached log is not empty + if (content.size() > 0) { + this.send(streamKey, content); + } - this.send(streamKey, content); // for next time - startIndex = endIndex; + streamSenderEntity.setStartIndex(endIndex); } public boolean send(String streamKey, String content) { @@ -154,7 +166,6 @@ public class StreamSender { MapRecord mapRecord = StreamRecords.mapBacked(fakeData).withStreamKey(TEST_STREAM_JAVA); - redisTemplate.opsForStream().add(mapRecord); TimeUnit.MILLISECONDS.sleep(200); diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java index f12a7dd..c28f479 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java @@ -11,6 +11,9 @@ import java.util.ArrayList; import java.util.List; +/** + * utils to cache store the command execution logs + */ @Component @Slf4j public class LogToArrayListCache { @@ -26,10 +29,11 @@ public class LogToArrayListCache { public void cacheLog(String streamKey, InputStream commandLogStream) { - ArrayList commandCachedLog = this.getCommandCachedLog(streamKey); + ArrayList commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey); // log.info(String.valueOf(commandCachedLog)); + // read from input stream and store to the cacheArrayList new BufferedReader(new InputStreamReader(commandLogStream)) .lines() .forEach( @@ -39,14 +43,14 @@ public class LogToArrayListCache { log.info("current streamKey is {} and CacheLog is {}", streamKey, commandCachedLog); } - public ArrayList getCommandCachedLog(String streamKey) { + public ArrayList getExecutionCmdCachedLogArrayList(String streamKey) { - int keyToIndex = this.hashStreamKeyToIndex(streamKey); + int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey); return CachedCommandLog.get(keyToIndex); } - private int hashStreamKeyToIndex(String streamKey) { + private int hashStreamKeyToCachedArrayListIndex(String streamKey) { int size = CachedCommandLog.size(); diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java b/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java index deb392e..734800d 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java @@ -88,7 +88,7 @@ public class CollectSystemInfo implements ApplicationContextAware { @PostConstruct private void getInjectServerInfo(){ - log.info("Starting getInjectServerInfo"); + log.info("Octopus Agent -- Starting getInjectServerInfo"); agentServerInfo = (AgentServerInfo) context.getBean("agentServerInfo"); @@ -96,11 +96,12 @@ public class CollectSystemInfo implements ApplicationContextAware { throw new MyRuntimeException(" Collect server info error !"); } - log.info("host server info has been collected == {}", agentServerInfo); + //log.info("host server info has been collected == {}", agentServerInfo); // start to send message to Octopus Server octopusAgentInitService.SendInfoToServer(agentServerInfo); - log.info("PassThroughTopicName server info has been send to octopus server !"); + + //log.info("PassThroughTopicName server info has been send to octopus server !"); } diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java index 52a8ddd..d7360c8 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java @@ -51,9 +51,10 @@ public class OctopusAgentInitService { * listen to the PassThroughTopicName queue from octopus server * * @param message 该方法不需要手动调用,Spring会自动运行这个监听方法 - *

+ * * 注意:如果该监听方法正常结束,那么Spring会自动确认消息 * 如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中 + * * @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息 */ @SneakyThrows @@ -86,6 +87,7 @@ public class OctopusAgentInitService { throw new MyRuntimeException(" Handle Octopus Message Error !"); } + } catch (Exception e) { // reject the message @@ -93,12 +95,15 @@ public class OctopusAgentInitService { // long deliveryTag, boolean requeue // channel.basicReject(deliveryTag,true); + log.error("Octopus Agent Initialization Error, please check !"); + log.info("waiting for 5 seconds"); + // 这里只是便于出现死循环时查看 TimeUnit.SECONDS.sleep(5); - throw new MyRuntimeException("Octopus Agent Initialization Error, please check !"); } + // handle init message ok // ack the info channel.basicAck(deliveryTag, false); } diff --git a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java index 71501ad..b5a74a5 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java +++ b/agent/src/main/java/io/wdd/agent/initialization/message/GenOctopusRabbitMQConnection.java @@ -46,7 +46,7 @@ public class GenOctopusRabbitMQConnection { // generate the ne topic queue for unique agent String agentTopicName = octopusMessage.getUuid(); - // reboot judgyment of existing exchange + // reboot judgement of existing exchange QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName); if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) { @@ -89,7 +89,6 @@ public class GenOctopusRabbitMQConnection { OctopusMessage octopusMessage; - try { octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); diff --git a/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java b/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java index 99f49a1..4d5ab69 100644 --- a/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java +++ b/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java @@ -66,11 +66,13 @@ public class ToServerMessage { // set PassThroughTopicName agent register ttl InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut); - log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo); + log.info("[Octopus Agent] - send INIT AgentServerInfo to Server = {}", agentServerInfo); // send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server try { + rabbitTemplate.convertAndSend(initRabbitMqConnector.INIT_EXCHANGE, initRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsBytes(agentServerInfo), initMessagePostProcessor); + } catch (JsonProcessingException e) { log.error("Failed to send INIT message to Server ! = {}", agentServerInfo); throw new RuntimeException(e); diff --git a/agent/src/main/resources/bootstrap.yml b/agent/src/main/resources/bootstrap.yml index 0c92b06..013d9e3 100644 --- a/agent/src/main/resources/bootstrap.yml +++ b/agent/src/main/resources/bootstrap.yml @@ -2,11 +2,11 @@ spring: application: name: octopus-agent profiles: - active: k3s + active: local cloud: nacos: config: - group: k3s + group: local config-retry-time: 3000 file-extension: yaml max-retry: 3 @@ -16,7 +16,7 @@ spring: timeout: 5000 config-long-poll-timeout: 5000 extension-configs: - - group: k3s - data-id: common-k3s.yaml + - group: local + data-id: common-local.yaml diff --git a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh index 015819b..d1ea79b 100644 --- a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh +++ b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh @@ -1,8 +1,11 @@ #!/bin/bash -. /octopus-agent/shell/lib/wdd-lib-log.sh -. /octopus-agent/shell/lib/wdd-lib-sys.sh +#. /octopus-agent/shell/lib/wdd-lib-log.sh +#. /octopus-agent/shell/lib/wdd-lib-sys.sh + +. ./wdd-lib-log.sh +. ./wdd-lib-sys.sh hostArchVersion="" diff --git a/source/src/main/java/io/wdd/source/shell/test.sh b/source/src/main/java/io/wdd/source/shell/test.sh index 5abe283..cc61868 100644 --- a/source/src/main/java/io/wdd/source/shell/test.sh +++ b/source/src/main/java/io/wdd/source/shell/test.sh @@ -1,26 +1,28 @@ #!/bin/bash -echo "start to update !" +echo start to update ! apt-get update -echo "" +echo " -echo "start to install nginx" +echo start to install nginx apt-get install nginx -y -echo "" -echo "start to uninstall nginx" +echo +echo start to uninstall nginx apt remove nginx -y -echo "" -echo "start to get ip info" +echo +echo start to get ip info curl https://ipinfo.io -echo "" -echo "--- end ---" +echo +echo --- end --- + +