[ agent ] [executor] Function File Executor -2

This commit is contained in:
zeaslity
2022-12-12 20:09:32 +08:00
parent 32096ef828
commit b49b210569
11 changed files with 125 additions and 51 deletions

View File

@@ -7,15 +7,15 @@ ENV TZ=Asia/Shanghai
ENV JAVA_OPTS="-Xms2028m -Xmx2048m" ENV JAVA_OPTS="-Xms2028m -Xmx2048m"
# Set time zone # Set time zone
RUN set -eux; \ #RUN set -eux; \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime; \ # ln -snf /usr/share/zoneinfo/$TZ /etc/localtime; \
echo $TZ > /etc/timezone # echo $TZ > /etc/timezone
# Create Folder # Create Folder
RUN mkdir -p /wdd RUN mkdir -p /wdd
# Define the work dir # Define the work dir
WORKDIR /wdd #WORKDIR /wdd
# Copy the jar and rename it # Copy the jar and rename it
COPY ./target/agent-*.jar /wdd/agent.jar COPY ./target/agent-*.jar /wdd/agent.jar

View File

@@ -387,12 +387,12 @@ main() {
PrintEnv PrintEnv
FunctionEnd
} }
main main
scp -r /wdd /host/wdd # copy jar to /host
# change the root working dir and use the host jvm to run the jar-file
chroot /host scp -r /wdd /host/wdd && chroot /host java ${JAVA_OPTS} -jar /wdd/agent.jar
java ${JAVA_OPTS} -jar /wdd/agent.jar

View File

@@ -37,17 +37,22 @@ public class CollectAllExecutorFunction {
*/ */
public static HashMap<String, List<List<String>>> ALL_FUNCTION_MAP = new HashMap<>(128); public static HashMap<String, List<List<String>>> ALL_FUNCTION_MAP = new HashMap<>(128);
/*
* listen to the nacos executor shell scripts
* */
public static ConfigService NacosConfigService;
@Value("${spring.cloud.nacos.config.server-addr}") @Value("${spring.cloud.nacos.config.server-addr}")
String nacosAddr; public String nacosAddr;
@Value("${spring.cloud.nacos.config.group}") @Value("${spring.cloud.nacos.config.group}")
String group; public String group;
@Value("${spring.cloud.nacos.config.file-extension}") @Value("${spring.cloud.nacos.config.file-extension}")
String fileExtension; public String fileExtension;
@Value("${octopus.executor.name}") @Value("${octopus.executor.name}")
String dataId; public String dataId;
@Resource @Resource
FunctionReader functionReader; FunctionReader functionReader;
@@ -65,13 +70,12 @@ public class CollectAllExecutorFunction {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put("serverAddr", nacosAddr); properties.put("serverAddr", nacosAddr);
ConfigService configService = NacosFactory.createConfigService(properties); NacosConfigService = NacosFactory.createConfigService(properties);
// Actively get the configuration. // Actively get the configuration.
String content = configService.getConfig(completeDataId, group, 5000); String content = NacosConfigService.getConfig(completeDataId, group, 5000);
log.info("functions get from nacos are {}", content); log.info("functions get from nacos are {}", content);
parseNacosFunctionYamlToMap(content); parseNacosFunctionYamlToMap(content);
@@ -80,7 +84,7 @@ public class CollectAllExecutorFunction {
} }
} }
private void parseNacosFunctionYamlToMap(String content) { public void parseNacosFunctionYamlToMap(String content) {
Yaml yaml = new Yaml(); Yaml yaml = new Yaml();
@@ -107,6 +111,7 @@ public class CollectAllExecutorFunction {
} }
); );
log.info("ALL_FUNCTION_MAP has been updated ! ---> {}", ALL_FUNCTION_MAP);
} }

View File

@@ -3,6 +3,7 @@ package io.wdd.agent.executor.redis;
import io.wdd.agent.config.beans.executor.CommandLog; import io.wdd.agent.config.beans.executor.CommandLog;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
@@ -24,12 +25,12 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Configuration @Configuration
@Slf4j
public class StreamSender { public class StreamSender {
@Resource @Resource
RedisTemplate redisTemplate; RedisTemplate redisTemplate;
private static ByteBuffer currentTimeByteBuffer(){ private static ByteBuffer currentTimeByteBuffer(){
byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8);
@@ -56,10 +57,14 @@ public class StreamSender {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.info("redis stream sender message is {}", map);
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey); StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey);
RecordId recordId = redisTemplate.opsForStream().add(stringRecord); RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
log.info("redis send recordId is {}",recordId);
return ObjectUtils.isNotEmpty(recordId); return ObjectUtils.isNotEmpty(recordId);
} }

View File

@@ -9,9 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
@@ -34,45 +32,57 @@ public class CommandExecutor {
} }
public void execute(String streamKey, List<String> command) { public int execute(String streamKey, List<String> command) {
ProcessBuilder processBuilder = new ProcessBuilder(command); ProcessBuilder processBuilder = new ProcessBuilder(command);
this.processExecute(streamKey, processBuilder); return this.processExecute(streamKey, processBuilder);
} }
public void execute(String streamKey, String... command) { public int execute(String streamKey, String... command) {
ProcessBuilder processBuilder = new ProcessBuilder(command); ProcessBuilder processBuilder = new ProcessBuilder(command);
this.processExecute(streamKey, processBuilder); return this.processExecute(streamKey, processBuilder);
} }
public void processExecute(String streamKey, ProcessBuilder processBuilder) { public int processExecute(String streamKey, ProcessBuilder processBuilder) {
processBuilder.redirectErrorStream(true); processBuilder.redirectErrorStream(true);
processBuilder.inheritIO(); // processBuilder.inheritIO();
processBuilder.directory(new File(System.getProperty("user.home"))); // processBuilder.directory(new File(System.getProperty("user.home")));
int processResult = 233;
Process process = null;
try { try {
process = processBuilder.start();
Process process = processBuilder.start();
LogToStreamSender toStreamSender = new LogToStreamSender(streamKey, process.getInputStream(), streamSender::send); LogToStreamSender toStreamSender = new LogToStreamSender(streamKey, process.getInputStream(), streamSender::send);
DaemonLogThread.start(toStreamSender);
log.warn("---------------------------------------------");
new BufferedReader(new InputStreamReader(process.getInputStream())).lines()
.map(
String::valueOf
).forEach(System.out::println);
log.warn("---------------------------------------------");
// a command shell don't understand how long it actually take // a command shell don't understand how long it actually take
int processResult = process.waitFor(); processResult = process.waitFor();
log.info("current shell command [{}] result is [{}]", processBuilder.command(), processResult); log.info("current shell command {} result is {}", processBuilder.command(), processResult);
DaemonLogThread.start(toStreamSender);
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
log.error("Shell command error ! {} + {}", e.getCause(), e.getMessage()); log.error("Shell command error ! {} + {}", e.getCause(), e.getMessage());
} }
return processResult;
} }
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException { private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {

View File

@@ -1,24 +1,27 @@
package io.wdd.agent.executor.shell; package io.wdd.agent.executor.shell;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import io.wdd.agent.executor.config.FunctionReader; import io.wdd.agent.executor.config.FunctionReader;
import io.wdd.agent.executor.function.CollectAllExecutorFunction; import io.wdd.agent.executor.function.CollectAllExecutorFunction;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP; import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP;
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.NacosConfigService;
@Service @Service
@Slf4j @Slf4j
public class FunctionExecutor { public class FunctionExecutor {
@Resource
FunctionReader functionReader;
@Resource @Resource
CommandExecutor commandExecutor; CommandExecutor commandExecutor;
@@ -53,14 +56,45 @@ public class FunctionExecutor {
log.info("all commands are {}", commandList); log.info("all commands are {}", commandList);
// todo modify this Iterator<List<String>> iterator = commandList.iterator();
commandList.stream().map(
command -> {
commandExecutor.execute(streamKey, command);
return 1;
}
).collect(Collectors.toList());
while (iterator.hasNext()) {
int execute = commandExecutor.execute(streamKey, iterator.next());
if (execute != 0) {
log.error("command list execute failed !");
break;
}
}
} }
@Bean
private void daemonListenToNacosFunctions(){
// add listener to listen to the real-time change of the Function Shell Scripts
try {
NacosConfigService.addListener(collectAllExecutorFunction.dataId + "." + collectAllExecutorFunction.fileExtension, collectAllExecutorFunction.group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String s) {
log.info("detected nacos function shell update ! {}", s);
collectAllExecutorFunction.parseNacosFunctionYamlToMap(s);
}
});
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
} }

View File

@@ -49,8 +49,7 @@ public class TestCommandExecutorController {
.contend(messageType) .contend(messageType)
.build(); .build();
System.out.println("executionMessage = " + executionMessage);
System.out.println("FUNCTION_REFLECTION = " + ALL_FUNCTION_MAP);
functionExecutor.execute(executionMessage); functionExecutor.execute(executionMessage);

View File

@@ -1,5 +1,11 @@
#!/bin/bash #!/bin/bash
rm -rf /wdd
rm -rf /root/logs
rm -rf /root/nacos
rm -rf /root/test.sh*
docker container stop octopus-agent-ubuntu && docker container rm octopus-agent-ubuntu docker container stop octopus-agent-ubuntu && docker container rm octopus-agent-ubuntu
docker run \ docker run \
@@ -8,11 +14,17 @@ docker run \
--net=host \ --net=host \
--pid=host \ --pid=host \
--ipc=host \ --ipc=host \
--uts=host \
--volume /:/host \ --volume /:/host \
--name=octopus-agent-ubuntu \ --name=octopus-agent-ubuntu \
octopus-agent-ubuntu:latest \ octopus-agent-ubuntu:latest \
docker logs --tail -f octopus-agent-ubuntu docker logs --tail 500 -f octopus-agent-ubuntu
docker run \ docker run \
-d \ -d \
@@ -38,6 +50,7 @@ docker run \
--privileged \ --privileged \
--net=host \ --net=host \
--pid=host \ --pid=host \
--uts=host \
--ipc=host \ --ipc=host \
--volume /:/host \ --volume /:/host \
--name=octopus-agent-ubuntu \ --name=octopus-agent-ubuntu \

View File

View File

@@ -1,4 +1,6 @@
spring:
redis:
timeout: 5000
lettuce:
pool:
time-between-eviction-runs: 5000

View File

@@ -107,6 +107,12 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId> <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency> </dependency>
<!-- lettuce pool 缓存连接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>