[ agent ] [ status ] - nacos dynamically listen to the configuration

This commit is contained in:
IceDerce
2023-01-05 21:53:32 +08:00
parent cdd2780189
commit 7e8478b7e3
7 changed files with 278 additions and 184 deletions

View File

@@ -13,7 +13,7 @@ import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP; import static io.wdd.agent.config.utils.NacosConfigurationCollector.ALL_FUNCTION_MAP;
@Component @Component
public class OMHandlerExecutor extends AbstractOctopusMessageHandler { public class OMHandlerExecutor extends AbstractOctopusMessageHandler {

View File

@@ -0,0 +1,217 @@
package io.wdd.agent.config.utils;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import io.wdd.agent.executor.config.FunctionReader;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.Executor;
import static io.wdd.agent.status.AppStatusCollector.ALL_APP_NEED_TO_MONITOR_STATUS;
@Component
@Lazy
@Slf4j
public class NacosConfigurationCollector {
/**
* store the Octopus Agent Functions and Function Command List
* key: function name
* value: function shell List<String> contend
*/
public static HashMap<String, List<List<String>>> ALL_FUNCTION_MAP = new HashMap<>(128);
/*
* listen to the nacos executor shell scripts
* */
public static ConfigService NacosConfigService;
@Value("${spring.cloud.nacos.config.server-addr}")
public String nacosAddr;
@Value("${spring.cloud.nacos.config.group}")
public String group;
@Value("${spring.cloud.nacos.config.file-extension}")
public String fileExtension;
@Value("${octopus.executor.name}")
public String executorFunctionDataId;
@Value("${octopus.status.name}")
public String appStatusDataId;
@Resource
FunctionReader functionReader;
@PostConstruct
private void CollectAllFunctionFromNacos() {
try {
// Initialize the configuration service, and the console automatically obtains the following parameters through the sample code.
String executorFunctionDataId = this.executorFunctionDataId + "." + fileExtension;
String appStatusDataId = this.appStatusDataId + "-" + group + "." + fileExtension;
Properties properties = new Properties();
properties.put("serverAddr", nacosAddr);
NacosConfigService = NacosFactory.createConfigService(properties);
String executorFunctionContent = NacosConfigService.getConfig(executorFunctionDataId, group, 5000);
String allApplicationNeedToMonitorStatus = NacosConfigService.getConfig(appStatusDataId, group, 5000);
parseNacosFunctionYamlToMap(executorFunctionContent);
parseAllApplicationNeedToMonitorStatus(allApplicationNeedToMonitorStatus);
/**
* https://nacos.io/zh-cn/docs/v2/guide/user/sdk.html
*
* dynamically listen to the nacos
*
* Actively get the executor functions configuration.
* */
NacosConfigService.addListener(executorFunctionDataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String newExecutorFunction) {
log.debug("functions get from nacos are {}", executorFunctionContent);
parseNacosFunctionYamlToMap(newExecutorFunction);
}
});
/**
* Actively get ALl applications need to monitor
* */
NacosConfigService.addListener(appStatusDataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String allApplicationNeedToMonitorStatus) {
log.debug("all applications need to monitor status has changed to => {}", allApplicationNeedToMonitorStatus);
parseAllApplicationNeedToMonitorStatus(allApplicationNeedToMonitorStatus);
}
});
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
private void parseAllApplicationNeedToMonitorStatus(String allApplicationNeedToMonitorStatus) {
Yaml yaml = new Yaml();
Map<String, Object> map = yaml.load(allApplicationNeedToMonitorStatus);
Map<String, Object> octopus = (Map<String, Object>) map.get("octopus");
Map<String, Object> agent = (Map<String, Object>) octopus.get("agent");
Map<String, Object> status = (Map<String, Object>) agent.get("status");
ArrayList<String> all_app_from_nacos = (ArrayList<String>) status.get("app");
// need to keep update to nacos so need to clear the cache
ALL_APP_NEED_TO_MONITOR_STATUS.clear();
all_app_from_nacos.stream().forEach(
app -> {
String[] split = app.split("/");
ALL_APP_NEED_TO_MONITOR_STATUS.put(split[0], split[1] + ".service");
}
);
log.info("ALL_APP_NEED_TO_MONITOR_STATUS are => {}", ALL_APP_NEED_TO_MONITOR_STATUS);
// help gc
map = null;
octopus = null;
agent = null;
status = null;
all_app_from_nacos = null;
}
public void parseNacosFunctionYamlToMap(String allApplicationNeedToMonitorStatus) {
Yaml yaml = new Yaml();
yaml.loadAll(allApplicationNeedToMonitorStatus).iterator().forEachRemaining(realFunction -> {
if (!(realFunction instanceof LinkedHashMap)) {
System.out.println("realFunction = " + realFunction);
}
Map<String, String> stringMap = (Map<String, String>) realFunction;
Optional<String> functionName = stringMap.keySet().stream().findFirst();
List<List<String>> commandList = functionReader.ReadStringToCommandList(stringMap.get(functionName.get()));
/*log.info("Function {} , content is {}", functionName.get(), commandList);*/
ALL_FUNCTION_MAP.put(functionName.get(), commandList);
});
log.info("ALL_FUNCTION_MAP has been updated ! ---> {}", ALL_FUNCTION_MAP);
}
/**
* due to can't get shell from the jar file
* this is deprecated
*/
// @PostConstruct
// private void CollectAllFunctionShellScriptName() {
//
// // scan current package files name and store them to FUNCTION_REFLECTION
//
//
// Path absolutePath = Paths.get("").toAbsolutePath();
// log.info("current absolute path is {}", absolutePath);
//
// Path currentDirectory = Path.of(absolutePath + "/src/main/java/io/wdd/agent/executor/function").toAbsolutePath();
//
//
// IOFileFilter fileFilter = FileFilterUtils.suffixFileFilter(".sh");
// IOFileFilter directoryFileFilter = DirectoryFileFilter.INSTANCE;
//
// Collection<File> functionFileList = FileUtils.listFiles(currentDirectory.toFile(), fileFilter, directoryFileFilter);
//
// log.debug("all function shell script files are : {}", functionFileList);
//
// Map<String, String> collect = functionFileList.stream().collect(Collectors.toMap(
// functionFile -> functionFile.getName().split("\\.")[0],
// functionFile -> functionFile.getAbsolutePath()
// ));
//
// ALL_FUNCTION_MAP.putAll(collect);
//
// }
}

View File

@@ -2,7 +2,7 @@ package io.wdd.agent.executor;
import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import io.wdd.agent.executor.function.CollectAllExecutorFunction; import io.wdd.agent.config.utils.NacosConfigurationCollector;
import io.wdd.common.beans.executor.ExecutionMessage; import io.wdd.common.beans.executor.ExecutionMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@@ -13,8 +13,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP; import static io.wdd.agent.config.utils.NacosConfigurationCollector.ALL_FUNCTION_MAP;
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.NacosConfigService; import static io.wdd.agent.config.utils.NacosConfigurationCollector.NacosConfigService;
@Service @Service
@Slf4j @Slf4j
@@ -25,7 +25,7 @@ public class FunctionExecutor {
// todo called by timer // todo called by timer
@Resource @Resource
CollectAllExecutorFunction collectAllExecutorFunction; NacosConfigurationCollector nacosConfigurationCollector;
public void execute(ExecutionMessage executionMessage) { public void execute(ExecutionMessage executionMessage) {
@@ -74,7 +74,7 @@ public class FunctionExecutor {
// add listener to listen to the real-time change of the Function Shell Scripts // add listener to listen to the real-time change of the Function Shell Scripts
try { try {
NacosConfigService.addListener(collectAllExecutorFunction.dataId + "." + collectAllExecutorFunction.fileExtension, collectAllExecutorFunction.group, new Listener() { NacosConfigService.addListener(nacosConfigurationCollector.executorFunctionDataId + "." + nacosConfigurationCollector.fileExtension, nacosConfigurationCollector.group, new Listener() {
@Override @Override
public Executor getExecutor() { public Executor getExecutor() {
return null; return null;
@@ -85,7 +85,7 @@ public class FunctionExecutor {
log.info("detected nacos function shell update ! {}", s); log.info("detected nacos function shell update ! {}", s);
collectAllExecutorFunction.parseNacosFunctionYamlToMap(s); nacosConfigurationCollector.parseNacosFunctionYamlToMap(s);
} }
}); });

View File

@@ -1,152 +0,0 @@
package io.wdd.agent.executor.function;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.executor.config.FunctionReader;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
@Component
@Lazy
@Slf4j
public class CollectAllExecutorFunction {
/**
* store the Octopus Agent Functions and Function Command List
* key: function name
* value: function shell List<String> contend
*/
public static HashMap<String, List<List<String>>> ALL_FUNCTION_MAP = new HashMap<>(128);
/*
* listen to the nacos executor shell scripts
* */
public static ConfigService NacosConfigService;
@Value("${spring.cloud.nacos.config.server-addr}")
public String nacosAddr;
@Value("${spring.cloud.nacos.config.group}")
public String group;
@Value("${spring.cloud.nacos.config.file-extension}")
public String fileExtension;
@Value("${octopus.executor.name}")
public String dataId;
@Resource
FunctionReader functionReader;
@Resource
ObjectMapper objectMapper;
@PostConstruct
private void CollectAllFunctionFromNacos() {
try {
// Initialize the configuration service, and the console automatically obtains the following parameters through the sample code.
String completeDataId = dataId + "." + fileExtension;
Properties properties = new Properties();
properties.put("serverAddr", nacosAddr);
NacosConfigService = NacosFactory.createConfigService(properties);
// Actively get the configuration.
String content = NacosConfigService.getConfig(completeDataId, group, 5000);
log.info("functions get from nacos are {}", content);
parseNacosFunctionYamlToMap(content);
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
public void parseNacosFunctionYamlToMap(String content) {
Yaml yaml = new Yaml();
yaml.loadAll(content).iterator().forEachRemaining(
realFunction -> {
if (!(realFunction instanceof LinkedHashMap)) {
System.out.println("realFunction = " + realFunction);
}
Map<String, String> stringMap = (Map<String, String>) realFunction;
Optional<String> functionName = stringMap.keySet().stream().findFirst();
List<List<String>> commandList = functionReader.ReadStringToCommandList(stringMap.get(functionName.get()));
/*log.info("Function {} , content is {}", functionName.get(), commandList);*/
ALL_FUNCTION_MAP.put(
functionName.get(),
commandList
);
}
);
log.info("ALL_FUNCTION_MAP has been updated ! ---> {}", ALL_FUNCTION_MAP);
}
/**
* due to can't get shell from the jar file
* this is deprecated
*/
// @PostConstruct
private void CollectAllFunctionShellScriptName() {
// scan current package files name and store them to FUNCTION_REFLECTION
// Path absolutePath = FileSystems.getDefault().getPath("" ).toAbsolutePath();
Path absolutePath = Paths.get("").toAbsolutePath();
log.info("current absolute path is {}", absolutePath);
Path currentDirectory = Path.of(absolutePath + "/src/main/java/io/wdd/agent/executor/function").toAbsolutePath();
IOFileFilter fileFilter = FileFilterUtils.suffixFileFilter(".sh");
IOFileFilter directoryFileFilter = DirectoryFileFilter.INSTANCE;
Collection<File> functionFileList = FileUtils.listFiles(currentDirectory.toFile(), fileFilter, directoryFileFilter);
log.debug("all function shell script files are : {}", functionFileList);
Map<String, String> collect = functionFileList.stream().collect(Collectors.toMap(
functionFile -> functionFile.getName().split("\\.")[0],
functionFile -> functionFile.getAbsolutePath()
));
// ALL_FUNCTION_MAP.putAll(collect);
}
}

View File

@@ -6,7 +6,6 @@ import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.common.beans.status.*; import io.wdd.common.beans.status.*;
import io.wdd.common.utils.TimeUtils; import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord; import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@@ -16,7 +15,6 @@ import oshi.SystemInfo;
import oshi.hardware.HardwareAbstractionLayer; import oshi.hardware.HardwareAbstractionLayer;
import oshi.software.os.OperatingSystem; import oshi.software.os.OperatingSystem;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@@ -36,6 +34,8 @@ public class AgentStatusCollector {
*/ */
private static final OperatingSystem os; private static final OperatingSystem os;
private static final List<AgentStatus> AgentStatusCache = Collections.singletonList(new AgentStatus()); private static final List<AgentStatus> AgentStatusCache = Collections.singletonList(new AgentStatus());
private static final long ReportInitDelay = 60000;
private static final long ReportFixedRate = 15000;
static { static {
systemInfo = new SystemInfo(); systemInfo = new SystemInfo();
@@ -50,11 +50,6 @@ public class AgentStatusCollector {
@Resource @Resource
AgentServerInfo agentServerInfo; AgentServerInfo agentServerInfo;
private static final long ReportInitDelay = 60000;
private static final long ReportFixedRate = 15000;
public AgentStatus collect() { public AgentStatus collect() {
AgentStatus agentStatus = AgentStatusCache.get(0); AgentStatus agentStatus = AgentStatusCache.get(0);
@@ -67,24 +62,16 @@ public class AgentStatusCollector {
agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(), 1000)); agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(), 1000));
/* Memory */ /* Memory */
agentStatus.setMemoryInfo( agentStatus.setMemoryInfo(MemoryInfo.build(hardware.getMemory()));
MemoryInfo.build(hardware.getMemory())
);
/* Storage */ /* Storage */
agentStatus.setDiskStoreInfo( agentStatus.setDiskStoreInfo(DiskInfo.mapFromDiskStore(hardware.getDiskStores()));
DiskInfo.mapFromDiskStore(hardware.getDiskStores())
);
/* Network */ /* Network */
agentStatus.setNetworkInfo( agentStatus.setNetworkInfo(NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false)));
NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false))
);
/* operating system info */ /* operating system info */
agentStatus.setOsInfo( agentStatus.setOsInfo(AgentSystemInfo.mapFromOHSISystem(os));
AgentSystemInfo.mapFromOHSISystem(os)
);
/* Time */ /* Time */
agentStatus.setTime(TimeUtils.currentTimeString()); agentStatus.setTime(TimeUtils.currentTimeString());
@@ -117,7 +104,7 @@ public class AgentStatusCollector {
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey); StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusStreamKey);
log.debug("Agent Status is ==> {}",map); log.debug("Agent Status is ==> {}", map);
redisTemplate.opsForStream().add(stringRecord); redisTemplate.opsForStream().add(stringRecord);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {

View File

@@ -0,0 +1,44 @@
package io.wdd.agent.status;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Service
@Slf4j
public class AppStatusCollector {
// storage all the applications agent status should report
public static final HashMap<String, String> ALL_APP_NEED_TO_MONITOR_STATUS = new HashMap<>(16);
/**
* not very good
* but also a kind of method to dynamically listen to nacos configuration change
*/
/*@NacosValue(value = "${octopus.agent.status.enable}" , autoRefreshed = true)
private String all_app_from_nacos;*
/*@NacosConfigListener(
groupId = "k3s",
dataId = "octopus-agent-k3s.yaml",
type = ConfigType.YAML,
properties =
)
public void onMessage(String content){
log.debug("update octopus-agent nacos config are ==> {} ", content);
Yaml yaml = new Yaml();
Object load = yaml.load(content);
System.out.println("load = " + load);
}*/
}

View File

@@ -1,14 +1,12 @@
package io.wdd.agent; package io.wdd.agent;
import io.wdd.agent.executor.function.CollectAllExecutorFunction; import io.wdd.agent.config.utils.NacosConfigurationCollector;
import io.wdd.agent.executor.FunctionExecutor; import io.wdd.agent.executor.FunctionExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter; import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.io.filefilter.IOFileFilter;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.File;
@@ -27,7 +25,7 @@ class AgentApplicationTests {
FunctionExecutor functionExecutor; FunctionExecutor functionExecutor;
@Resource @Resource
CollectAllExecutorFunction collectAllExecutorFunction; NacosConfigurationCollector nacosConfigurationCollector;
// @Test // @Test