From b49b210569242d3dcbc9966ada910faf6606be71 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Mon, 12 Dec 2022 20:09:32 +0800 Subject: [PATCH] [ agent ] [executor] Function File Executor -2 --- agent/Dockerfile-wsl2 | 8 +-- agent/agent-entrypoint.sh | 10 ++-- .../function/CollectAllExecutorFunction.java | 21 +++++--- .../agent/executor/redis/StreamSender.java | 7 ++- .../agent/executor/shell/CommandExecutor.java | 42 +++++++++------ .../executor/shell/FunctionExecutor.java | 54 +++++++++++++++---- .../web/TestCommandExecutorController.java | 3 +- .../initialization/bootup/MaualRunAgent.sh | 15 +++++- agent/src/main/java/io/wdd/agent/todo.md | 0 agent/src/main/resources/application-k3s.yaml | 10 ++-- pom.xml | 6 +++ 11 files changed, 125 insertions(+), 51 deletions(-) create mode 100644 agent/src/main/java/io/wdd/agent/todo.md diff --git a/agent/Dockerfile-wsl2 b/agent/Dockerfile-wsl2 index 8d50b6d..8ac91f1 100644 --- a/agent/Dockerfile-wsl2 +++ b/agent/Dockerfile-wsl2 @@ -7,15 +7,15 @@ ENV TZ=Asia/Shanghai ENV JAVA_OPTS="-Xms2028m -Xmx2048m" # Set time zone -RUN set -eux; \ - ln -snf /usr/share/zoneinfo/$TZ /etc/localtime; \ - echo $TZ > /etc/timezone +#RUN set -eux; \ +# ln -snf /usr/share/zoneinfo/$TZ /etc/localtime; \ +# echo $TZ > /etc/timezone # Create Folder RUN mkdir -p /wdd # Define the work dir -WORKDIR /wdd +#WORKDIR /wdd # Copy the jar and rename it COPY ./target/agent-*.jar /wdd/agent.jar diff --git a/agent/agent-entrypoint.sh b/agent/agent-entrypoint.sh index 686832e..fe5706e 100644 --- a/agent/agent-entrypoint.sh +++ b/agent/agent-entrypoint.sh @@ -387,12 +387,12 @@ main() { PrintEnv + FunctionEnd + } main -scp -r /wdd /host/wdd - -chroot /host - -java ${JAVA_OPTS} -jar /wdd/agent.jar +# copy jar to /host +# change the root working dir and use the host jvm to run the jar-file +scp -r /wdd /host/wdd && chroot /host java ${JAVA_OPTS} -jar /wdd/agent.jar diff --git a/agent/src/main/java/io/wdd/agent/executor/function/CollectAllExecutorFunction.java b/agent/src/main/java/io/wdd/agent/executor/function/CollectAllExecutorFunction.java index 588016f..c833623 100644 --- a/agent/src/main/java/io/wdd/agent/executor/function/CollectAllExecutorFunction.java +++ b/agent/src/main/java/io/wdd/agent/executor/function/CollectAllExecutorFunction.java @@ -37,17 +37,22 @@ public class CollectAllExecutorFunction { */ public static HashMap>> ALL_FUNCTION_MAP = new HashMap<>(128); + /* + * listen to the nacos executor shell scripts + * */ + public static ConfigService NacosConfigService; + @Value("${spring.cloud.nacos.config.server-addr}") - String nacosAddr; + public String nacosAddr; @Value("${spring.cloud.nacos.config.group}") - String group; + public String group; @Value("${spring.cloud.nacos.config.file-extension}") - String fileExtension; + public String fileExtension; @Value("${octopus.executor.name}") - String dataId; + public String dataId; @Resource FunctionReader functionReader; @@ -65,13 +70,12 @@ public class CollectAllExecutorFunction { Properties properties = new Properties(); properties.put("serverAddr", nacosAddr); - ConfigService configService = NacosFactory.createConfigService(properties); + NacosConfigService = NacosFactory.createConfigService(properties); // Actively get the configuration. - String content = configService.getConfig(completeDataId, group, 5000); + String content = NacosConfigService.getConfig(completeDataId, group, 5000); log.info("functions get from nacos are {}", content); - parseNacosFunctionYamlToMap(content); @@ -80,7 +84,7 @@ public class CollectAllExecutorFunction { } } - private void parseNacosFunctionYamlToMap(String content) { + public void parseNacosFunctionYamlToMap(String content) { Yaml yaml = new Yaml(); @@ -107,6 +111,7 @@ public class CollectAllExecutorFunction { } ); + log.info("ALL_FUNCTION_MAP has been updated ! ---> {}", ALL_FUNCTION_MAP); } diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index 64eea92..0a487c2 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -3,6 +3,7 @@ package io.wdd.agent.executor.redis; import io.wdd.agent.config.beans.executor.CommandLog; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -24,12 +25,12 @@ import java.util.Map; import java.util.concurrent.TimeUnit; @Configuration +@Slf4j public class StreamSender { @Resource RedisTemplate redisTemplate; - private static ByteBuffer currentTimeByteBuffer(){ byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); @@ -56,10 +57,14 @@ public class StreamSender { throw new RuntimeException(e); } + log.info("redis stream sender message is {}", map); + StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey); RecordId recordId = redisTemplate.opsForStream().add(stringRecord); + log.info("redis send recordId is {}",recordId); + return ObjectUtils.isNotEmpty(recordId); } diff --git a/agent/src/main/java/io/wdd/agent/executor/shell/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/shell/CommandExecutor.java index 81f2bea..9c8b8d1 100644 --- a/agent/src/main/java/io/wdd/agent/executor/shell/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/shell/CommandExecutor.java @@ -9,9 +9,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.nio.ByteBuffer; import java.util.List; @@ -34,45 +32,57 @@ public class CommandExecutor { } - public void execute(String streamKey, List command) { + public int execute(String streamKey, List command) { ProcessBuilder processBuilder = new ProcessBuilder(command); - this.processExecute(streamKey, processBuilder); + return this.processExecute(streamKey, processBuilder); } - public void execute(String streamKey, String... command) { + public int execute(String streamKey, String... command) { ProcessBuilder processBuilder = new ProcessBuilder(command); - this.processExecute(streamKey, processBuilder); + return this.processExecute(streamKey, processBuilder); } - public void processExecute(String streamKey, ProcessBuilder processBuilder) { + public int processExecute(String streamKey, ProcessBuilder processBuilder) { processBuilder.redirectErrorStream(true); - processBuilder.inheritIO(); - processBuilder.directory(new File(System.getProperty("user.home"))); +// processBuilder.inheritIO(); +// processBuilder.directory(new File(System.getProperty("user.home"))); + + int processResult = 233; - Process process = null; try { - process = processBuilder.start(); + + Process process = processBuilder.start(); + + LogToStreamSender toStreamSender = new LogToStreamSender(streamKey, process.getInputStream(), streamSender::send); + DaemonLogThread.start(toStreamSender); + + log.warn("---------------------------------------------"); + new BufferedReader(new InputStreamReader(process.getInputStream())).lines() + .map( + String::valueOf + ).forEach(System.out::println); + log.warn("---------------------------------------------"); // a command shell don't understand how long it actually take - int processResult = process.waitFor(); + processResult = process.waitFor(); - log.info("current shell command [{}] result is [{}]", processBuilder.command(), processResult); - - DaemonLogThread.start(toStreamSender); + log.info("current shell command {} result is {}", processBuilder.command(), processResult); } catch (IOException | InterruptedException e) { log.error("Shell command error ! {} + {}", e.getCause(), e.getMessage()); } + + return processResult; } private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { diff --git a/agent/src/main/java/io/wdd/agent/executor/shell/FunctionExecutor.java b/agent/src/main/java/io/wdd/agent/executor/shell/FunctionExecutor.java index d732914..c2a06a1 100644 --- a/agent/src/main/java/io/wdd/agent/executor/shell/FunctionExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/shell/FunctionExecutor.java @@ -1,24 +1,27 @@ package io.wdd.agent.executor.shell; +import com.alibaba.nacos.api.config.listener.Listener; +import com.alibaba.nacos.api.exception.NacosException; import io.wdd.agent.executor.config.FunctionReader; import io.wdd.agent.executor.function.CollectAllExecutorFunction; import io.wdd.common.beans.executor.ExecutionMessage; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP; +import static io.wdd.agent.executor.function.CollectAllExecutorFunction.NacosConfigService; @Service @Slf4j public class FunctionExecutor { - @Resource - FunctionReader functionReader; - @Resource CommandExecutor commandExecutor; @@ -53,14 +56,45 @@ public class FunctionExecutor { log.info("all commands are {}", commandList); - // todo modify this - commandList.stream().map( - command -> { - commandExecutor.execute(streamKey, command); - return 1; - } - ).collect(Collectors.toList()); + Iterator> iterator = commandList.iterator(); + while (iterator.hasNext()) { + int execute = commandExecutor.execute(streamKey, iterator.next()); + + if (execute != 0) { + log.error("command list execute failed !"); + break; + } + + } } + + + @Bean + private void daemonListenToNacosFunctions(){ + + // add listener to listen to the real-time change of the Function Shell Scripts + try { + NacosConfigService.addListener(collectAllExecutorFunction.dataId + "." + collectAllExecutorFunction.fileExtension, collectAllExecutorFunction.group, new Listener() { + @Override + public Executor getExecutor() { + return null; + } + + @Override + public void receiveConfigInfo(String s) { + + log.info("detected nacos function shell update ! {}", s); + + collectAllExecutorFunction.parseNacosFunctionYamlToMap(s); + + } + }); + + } catch (NacosException e) { + throw new RuntimeException(e); + } + } + } diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index b3f2f6a..cb8f4f2 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -49,8 +49,7 @@ public class TestCommandExecutorController { .contend(messageType) .build(); - - System.out.println("FUNCTION_REFLECTION = " + ALL_FUNCTION_MAP); + System.out.println("executionMessage = " + executionMessage); functionExecutor.execute(executionMessage); diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/MaualRunAgent.sh b/agent/src/main/java/io/wdd/agent/initialization/bootup/MaualRunAgent.sh index afebcb4..6f5acfd 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/MaualRunAgent.sh +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/MaualRunAgent.sh @@ -1,5 +1,11 @@ #!/bin/bash + +rm -rf /wdd +rm -rf /root/logs +rm -rf /root/nacos +rm -rf /root/test.sh* + docker container stop octopus-agent-ubuntu && docker container rm octopus-agent-ubuntu docker run \ @@ -8,11 +14,17 @@ docker run \ --net=host \ --pid=host \ --ipc=host \ + --uts=host \ --volume /:/host \ --name=octopus-agent-ubuntu \ octopus-agent-ubuntu:latest \ -docker logs --tail -f octopus-agent-ubuntu +docker logs --tail 500 -f octopus-agent-ubuntu + + + + + docker run \ -d \ @@ -38,6 +50,7 @@ docker run \ --privileged \ --net=host \ --pid=host \ + --uts=host \ --ipc=host \ --volume /:/host \ --name=octopus-agent-ubuntu \ diff --git a/agent/src/main/java/io/wdd/agent/todo.md b/agent/src/main/java/io/wdd/agent/todo.md new file mode 100644 index 0000000..e69de29 diff --git a/agent/src/main/resources/application-k3s.yaml b/agent/src/main/resources/application-k3s.yaml index fd40910..877b39a 100644 --- a/agent/src/main/resources/application-k3s.yaml +++ b/agent/src/main/resources/application-k3s.yaml @@ -1,4 +1,6 @@ - - - - +spring: + redis: + timeout: 5000 + lettuce: + pool: + time-between-eviction-runs: 5000 diff --git a/pom.xml b/pom.xml index 4492b9f..3bdea62 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,12 @@ org.springframework.boot spring-boot-starter-data-redis + + + org.apache.commons + commons-pool2 + + commons-beanutils