[ agent ] [ status ]- optimize the network status - 1

This commit is contained in:
IceDerce
2023-01-12 20:56:55 +08:00
parent 1b7b4f4104
commit 5c809bb000
8 changed files with 183 additions and 67 deletions

View File

@@ -50,7 +50,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
} else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) {
// metric status
agentStatusCollector.collect(
statusMessage.getMetricRepeatCount(),
statusMessage.getMetricRepeatPinch()

View File

@@ -111,7 +111,6 @@ public class NacosConfigurationCollector {
return null;
}
@Override
public void receiveConfigInfo(String allApplicationNeedToMonitorStatus) {
log.debug("all applications need to monitor status has changed to => {}", allApplicationNeedToMonitorStatus);

View File

@@ -2,7 +2,6 @@ package io.wdd.agent.executor.status;
import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.CheckSingleAppStatusCallable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@@ -51,25 +50,31 @@ public class AppStatusExecutor {
public HashMap<String, Set<String>> checkAppStatus(boolean allAppStatus) {
// check all app status
Map<String, List<String[]>> collect = ALL_APP_NEED_TO_MONITOR_STATUS.keySet().stream()
Map<String, List<String[]>> collect = ALL_APP_NEED_TO_MONITOR_STATUS
.keySet()
.stream()
.map(
appName -> {
// generate single app status callable task
CheckSingleAppStatusCallable singleAppStatusCallable = new CheckSingleAppStatusCallable(appName, APP_STATUS_CHECK_COMMAND);
SingleAppStatusCallable singleAppStatusCallable = new SingleAppStatusCallable(appName,
APP_STATUS_CHECK_COMMAND);
// use thread pool to run the command to get the singe app status result
Future<String[]> appStatusFuture = AgentCommonThreadPool.pool.submit(singleAppStatusCallable);
return appStatusFuture;
}
).map(
)
.map(
// deal with the app status future result
appStatusFuture -> {
try {
return appStatusFuture.get(15, TimeUnit.SECONDS);
return appStatusFuture.get(15,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
} catch (
InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
@@ -90,15 +95,22 @@ public class AppStatusExecutor {
// Failure -> [Redis]
// NotInstall -> [Docker]
HashMap<String, Set<String>> result = new HashMap<>(16);
collect.entrySet().stream().map(
entry -> {
String status = entry.getKey();
Set<String> appNameSet = entry.getValue().stream().map(appStatus -> appStatus[1]).collect(Collectors.toSet());
collect.entrySet()
.stream()
.map(
entry -> {
String status = entry.getKey();
Set<String> appNameSet = entry.getValue()
.stream()
.map(appStatus -> appStatus[1])
.collect(Collectors.toSet());
result.put(status, appNameSet);
return 1;
}
).collect(Collectors.toSet());
result.put(status,
appNameSet);
return 1;
}
)
.collect(Collectors.toSet());
return result;

View File

@@ -0,0 +1,47 @@
package io.wdd.agent.executor.status;
import io.wdd.common.utils.FormatUtils;
import lombok.extern.slf4j.Slf4j;
import oshi.hardware.NetworkIF;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@Slf4j
public class NetworkInterfaceSpeedCallable implements Callable<String[]> {
private NetworkIF networkInterface;
public NetworkInterfaceSpeedCallable(NetworkIF networkInterface) {
this.networkInterface = networkInterface;
}
@Override
public String[] call() throws Exception {
String[] result = {
"sendSpeed",
"recvSpeed"
};
long recvPre = networkInterface.getBytesRecv();
long sentPre = networkInterface.getBytesSent();
long pre = new Date().getTime();
TimeUnit.SECONDS.sleep(1);
long recv = networkInterface.getBytesRecv();
long sent = networkInterface.getBytesSent();
long now = new Date().getTime();
long recvSpeed = (recv - recvPre) / (now - pre);
long sentSpeed = (sent - sentPre) / (now - pre);
result[0] = FormatUtils.formatData(sentSpeed) + "/s";
result[1] = FormatUtils.formatData(recvSpeed) + "/s";
return result;
}
}

View File

@@ -12,7 +12,10 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 计算网卡接口的网速信息
@@ -31,7 +34,7 @@ public class NetworkStatusExecutor {
"-q",
"-T10",
"-O-",
"ipinfo.io/ip"
"https://ipinfo.io/ip"
));
GET_PUBLIC_IPV6_ADDRESS_COMMAND = new ArrayList<>(List.of(
@@ -49,21 +52,53 @@ public class NetworkStatusExecutor {
// 获取网卡接口的公网Ipv4和Ipv6信息
// 不需要AgentBootUp通过命令行即可获取
// 需要定时更新Agent的信息根据Boolean来决定
String[] publicIpAddress = new String[]{
"publicIpv4",
"publicIpv6"
};
if (updatePublicIpAddr) {
String[] publicIpAddress = collectAgentPublicIpAddress();
publicIpAddress = collectAgentPublicIpAddress();
}
// 计算网卡接口的网速信息
networkIFList.forEach(
networkIF -> {
// AgentCommonThreadPool.pool.submit();
}
);
ArrayList<NetworkInfo> result = new ArrayList<>(16);
return null;
// 计算网卡接口的网速信息
for (NetworkIF networkIF : networkIFList) {
// 计算网速
NetworkInterfaceSpeedCallable speedCallable = new NetworkInterfaceSpeedCallable(networkIF);
Future<String[]> netSpeedFuture = AgentCommonThreadPool.pool.submit(speedCallable);
// 构建 NetworkInfo对象
NetworkInfo singleNetworkInfo = NetworkInfo.mapFromNetworkIF(networkIF);
try {
String[] networkSpeedList = netSpeedFuture.get(5,
TimeUnit.SECONDS);
// 注入网速信息
singleNetworkInfo.setSendSpeed(networkSpeedList[0]);
singleNetworkInfo.setRecvSpeed(networkSpeedList[1]);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
if (updatePublicIpAddr) {
// 注入公网IP地址信息
singleNetworkInfo.setPublicIpv4Addr(publicIpAddress[0]);
singleNetworkInfo.setPublicIpv6Addr(publicIpAddress[1]);
}
// 加入单个网卡的信息
result.add(singleNetworkInfo);
}
return result;
}
/**
* @return 第一位公网Ipv4 第二位公网Ipv6
*/
@@ -95,9 +130,7 @@ public class NetworkStatusExecutor {
return ipAddr;
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}

View File

@@ -1,4 +1,4 @@
package io.wdd.agent.executor;
package io.wdd.agent.executor.status;
import io.wdd.agent.executor.config.CommandPipelineBuilder;
import io.wdd.common.beans.status.AppStatusEnum;
@@ -18,12 +18,12 @@ import static io.wdd.agent.executor.status.AppStatusExecutor.ALL_APP_NEED_TO_MON
@Slf4j
public class CheckSingleAppStatusCallable implements Callable<String[]> {
public class SingleAppStatusCallable implements Callable<String[]> {
private final ArrayList<ArrayList<String>> commandList;
private final String appName;
public CheckSingleAppStatusCallable(String appName, ArrayList<ArrayList<String>> commandList) {
public SingleAppStatusCallable(String appName, ArrayList<ArrayList<String>> commandList) {
this.commandList = commandList;
this.appName = appName;
}

View File

@@ -5,8 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.status.AppStatusExecutor;
import io.wdd.agent.executor.status.NetworkStatusExecutor;
import io.wdd.common.beans.status.*;
import io.wdd.common.utils.TimeUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
@@ -36,33 +38,33 @@ public class AgentStatusCollector {
private static final OperatingSystem os;
private static final List<AgentStatus> AgentStatusCache = Collections.singletonList(new AgentStatus());
private static final long ReportInitDelay = 60000;
private static final long ReportFixedRate = 15000;
private String statusRedisStreamKey;
static {
systemInfo = new SystemInfo();
hardware = systemInfo.getHardware();
os = systemInfo.getOperatingSystem();
}
@PostConstruct
private void generateStatusRedisStreamKey() {
statusRedisStreamKey = AgentStatus.getRedisStatusKey( agentServerInfo.getAgentTopicName());
}
@Resource
RedisTemplate redisTemplate;
@Resource
ObjectMapper objectMapper;
@Resource
AgentServerInfo agentServerInfo;
@Resource
AppStatusExecutor appStatusExecutor;
@Resource
NetworkStatusExecutor networkStatusExecutor;
private String statusRedisStreamKey;
@PostConstruct
private void generateStatusRedisStreamKey() {
statusRedisStreamKey = AgentStatus.getRedisStatusKey(agentServerInfo.getAgentTopicName());
}
@SneakyThrows
public AgentStatus collect() {
AgentStatus agentStatus = AgentStatusCache.get(0);
@@ -72,7 +74,8 @@ public class AgentStatusCollector {
agentStatus.setAgentTopicName(agentServerInfo.getAgentTopicName());
/* CPU */
agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(), 1000));
agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(),
1000));
/* Memory */
agentStatus.setMemoryInfo(MemoryInfo.build(hardware.getMemory()));
@@ -81,7 +84,11 @@ public class AgentStatusCollector {
agentStatus.setDiskStoreInfo(DiskInfo.mapFromDiskStore(hardware.getDiskStores()));
/* Network */
agentStatus.setNetworkInfo(NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false)));
agentStatus.setNetworkInfo(
// NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false))
networkStatusExecutor.collect(hardware.getNetworkIFs(false),
false)
);
/* operating system info */
agentStatus.setOsInfo(AgentSystemInfo.mapFromOHSISystem(os));
@@ -94,30 +101,22 @@ public class AgentStatusCollector {
parseAppStatus(appStatusExecutor.checkAppStatus(true))
);
// time sleep wait for status collect
// like network speed calculate
TimeUnit.SECONDS.sleep(3);
return agentStatus;
}
private AppStatusInfo parseAppStatus(HashMap<String, Set<String>> checkAppStatus) {
return AppStatusInfo.builder()
.Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName()))
.Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName()))
.NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName()))
.build();
.Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName()))
.Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName()))
.NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName()))
.build();
}
/**
* when server first time boot up
* the server info are not collected completely
* this will be executed to update or complete the octopus agent server info
*/
// @Scheduled(initialDelay = 180000)
// public void updateAgentServerInfo(){
//
//
//
// }
// agent boot up 120s then start to report its status
// at the fix rate of 15s
/*@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)*/
@@ -125,12 +124,17 @@ public class AgentStatusCollector {
try {
Map<String, String> map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect()));
Map<String, String> map = Map.of(TimeUtils.currentTimeString(),
objectMapper.writeValueAsString(collect()));
StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusRedisStreamKey);
StringRecord stringRecord = StreamRecords.string(map)
.withStreamKey(statusRedisStreamKey);
log.debug("Agent Status is ==> {}", map);
redisTemplate.opsForStream().add(stringRecord);
log.debug("Agent Status is ==> {}",
map);
redisTemplate.opsForStream()
.add(stringRecord);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
@@ -139,8 +143,8 @@ public class AgentStatusCollector {
}
/**
* 接收来自 OMHandlerStatus 的调用
* 汇报服务器的状态信息
* 接收来自 OMHandlerStatus 的调用
* 汇报服务器的状态信息
*
* @param metricRepeatCount 需要重复的次数
* @param metricRepeatPinch 重复时间间隔

View File

@@ -23,9 +23,9 @@ public class NetworkInfo {
private String macAddr;
private String mtu;
private String[] innerIpv4Addr;
private String[] publicIpv4Addr;
private String publicIpv4Addr;
private String[] innerIpv6Addr;
private String[] publicIpv6Addr;
private String publicIpv6Addr;
/**
* 过去1s内的网速接收速率
*/
@@ -37,6 +37,28 @@ public class NetworkInfo {
private String trafficRecv;
private String trafficSend;
public static NetworkInfo mapFromNetworkIF(NetworkIF networkIF) {
return NetworkInfo
.builder()
.name(networkIF.getName())
.displayName(networkIF.getDisplayName())
.mtu(String.valueOf(networkIF.getMTU()))
.macAddr(networkIF.getMacaddr())
.innerIpv4Addr(generateIPDICRFromNetworkIFList(
networkIF,
4
))
.innerIpv6Addr(generateIPDICRFromNetworkIFList(
networkIF,
6
))
.trafficSend(FormatUtils.formatData(networkIF.getBytesSent()))
.trafficRecv(FormatUtils.formatData(networkIF.getBytesRecv())
)
.build();
}
public static List<NetworkInfo> mapFromNetworkIFS(List<NetworkIF> networkIFList) {
return networkIFList