From 5c809bb0006db97370feeefc141946971d53843a Mon Sep 17 00:00:00 2001 From: IceDerce Date: Thu, 12 Jan 2023 20:56:55 +0800 Subject: [PATCH] [ agent ] [ status ]- optimize the network status - 1 --- .../message/handler/OMHandlerStatus.java | 1 - .../utils/NacosConfigurationCollector.java | 1 - .../executor/status/AppStatusExecutor.java | 40 +++++++---- .../status/NetworkInterfaceSpeedCallable.java | 47 ++++++++++++ .../status/NetworkStatusExecutor.java | 57 +++++++++++---- .../SingleAppStatusCallable.java} | 6 +- .../agent/status/AgentStatusCollector.java | 72 ++++++++++--------- .../wdd/common/beans/status/NetworkInfo.java | 26 ++++++- 8 files changed, 183 insertions(+), 67 deletions(-) create mode 100644 agent/src/main/java/io/wdd/agent/executor/status/NetworkInterfaceSpeedCallable.java rename agent/src/main/java/io/wdd/agent/executor/{CheckSingleAppStatusCallable.java => status/SingleAppStatusCallable.java} (93%) diff --git a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java index 5f5dbd8..bda1ce3 100644 --- a/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/message/handler/OMHandlerStatus.java @@ -50,7 +50,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler { } else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) { // metric status - agentStatusCollector.collect( statusMessage.getMetricRepeatCount(), statusMessage.getMetricRepeatPinch() diff --git a/agent/src/main/java/io/wdd/agent/config/utils/NacosConfigurationCollector.java b/agent/src/main/java/io/wdd/agent/config/utils/NacosConfigurationCollector.java index 0a79cec..823f735 100644 --- a/agent/src/main/java/io/wdd/agent/config/utils/NacosConfigurationCollector.java +++ b/agent/src/main/java/io/wdd/agent/config/utils/NacosConfigurationCollector.java @@ -111,7 +111,6 @@ public class NacosConfigurationCollector { return null; } - @Override public void receiveConfigInfo(String allApplicationNeedToMonitorStatus) { log.debug("all applications need to monitor status has changed to => {}", allApplicationNeedToMonitorStatus); diff --git a/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java b/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java index b80b4c0..b146983 100644 --- a/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/status/AppStatusExecutor.java @@ -2,7 +2,6 @@ package io.wdd.agent.executor.status; import io.wdd.agent.config.utils.AgentCommonThreadPool; -import io.wdd.agent.executor.CheckSingleAppStatusCallable; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -51,25 +50,31 @@ public class AppStatusExecutor { public HashMap> checkAppStatus(boolean allAppStatus) { // check all app status - Map> collect = ALL_APP_NEED_TO_MONITOR_STATUS.keySet().stream() + Map> collect = ALL_APP_NEED_TO_MONITOR_STATUS + .keySet() + .stream() .map( appName -> { // generate single app status callable task - CheckSingleAppStatusCallable singleAppStatusCallable = new CheckSingleAppStatusCallable(appName, APP_STATUS_CHECK_COMMAND); + SingleAppStatusCallable singleAppStatusCallable = new SingleAppStatusCallable(appName, + APP_STATUS_CHECK_COMMAND); // use thread pool to run the command to get the singe app status result Future appStatusFuture = AgentCommonThreadPool.pool.submit(singleAppStatusCallable); return appStatusFuture; } - ).map( + ) + .map( // deal with the app status future result appStatusFuture -> { try { - return appStatusFuture.get(15, TimeUnit.SECONDS); + return appStatusFuture.get(15, + TimeUnit.SECONDS); - } catch (InterruptedException e) { + } catch ( + InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); @@ -90,15 +95,22 @@ public class AppStatusExecutor { // Failure -> [Redis] // NotInstall -> [Docker] HashMap> result = new HashMap<>(16); - collect.entrySet().stream().map( - entry -> { - String status = entry.getKey(); - Set appNameSet = entry.getValue().stream().map(appStatus -> appStatus[1]).collect(Collectors.toSet()); + collect.entrySet() + .stream() + .map( + entry -> { + String status = entry.getKey(); + Set appNameSet = entry.getValue() + .stream() + .map(appStatus -> appStatus[1]) + .collect(Collectors.toSet()); - result.put(status, appNameSet); - return 1; - } - ).collect(Collectors.toSet()); + result.put(status, + appNameSet); + return 1; + } + ) + .collect(Collectors.toSet()); return result; diff --git a/agent/src/main/java/io/wdd/agent/executor/status/NetworkInterfaceSpeedCallable.java b/agent/src/main/java/io/wdd/agent/executor/status/NetworkInterfaceSpeedCallable.java new file mode 100644 index 0000000..92bad3c --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/executor/status/NetworkInterfaceSpeedCallable.java @@ -0,0 +1,47 @@ +package io.wdd.agent.executor.status; + +import io.wdd.common.utils.FormatUtils; +import lombok.extern.slf4j.Slf4j; +import oshi.hardware.NetworkIF; + +import java.time.LocalDateTime; +import java.util.Date; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + + +@Slf4j +public class NetworkInterfaceSpeedCallable implements Callable { + + private NetworkIF networkInterface; + + public NetworkInterfaceSpeedCallable(NetworkIF networkInterface) { + this.networkInterface = networkInterface; + } + + @Override + public String[] call() throws Exception { + String[] result = { + "sendSpeed", + "recvSpeed" + }; + + long recvPre = networkInterface.getBytesRecv(); + long sentPre = networkInterface.getBytesSent(); + long pre = new Date().getTime(); + + TimeUnit.SECONDS.sleep(1); + + long recv = networkInterface.getBytesRecv(); + long sent = networkInterface.getBytesSent(); + long now = new Date().getTime(); + + long recvSpeed = (recv - recvPre) / (now - pre); + long sentSpeed = (sent - sentPre) / (now - pre); + + result[0] = FormatUtils.formatData(sentSpeed) + "/s"; + result[1] = FormatUtils.formatData(recvSpeed) + "/s"; + + return result; + } +} diff --git a/agent/src/main/java/io/wdd/agent/executor/status/NetworkStatusExecutor.java b/agent/src/main/java/io/wdd/agent/executor/status/NetworkStatusExecutor.java index b394825..a5f93c7 100644 --- a/agent/src/main/java/io/wdd/agent/executor/status/NetworkStatusExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/status/NetworkStatusExecutor.java @@ -12,7 +12,10 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * 计算网卡接口的网速信息 @@ -31,7 +34,7 @@ public class NetworkStatusExecutor { "-q", "-T10", "-O-", - "ipinfo.io/ip" + "https://ipinfo.io/ip" )); GET_PUBLIC_IPV6_ADDRESS_COMMAND = new ArrayList<>(List.of( @@ -49,21 +52,53 @@ public class NetworkStatusExecutor { // 获取网卡接口的公网Ipv4和Ipv6信息 // 不需要,AgentBootUp通过命令行即可获取 // 需要定时更新Agent的信息,根据Boolean来决定 + String[] publicIpAddress = new String[]{ + "publicIpv4", + "publicIpv6" + }; if (updatePublicIpAddr) { - String[] publicIpAddress = collectAgentPublicIpAddress(); + publicIpAddress = collectAgentPublicIpAddress(); } - // 计算网卡接口的网速信息 - networkIFList.forEach( - networkIF -> { -// AgentCommonThreadPool.pool.submit(); - } - ); + ArrayList result = new ArrayList<>(16); - return null; + + // 计算网卡接口的网速信息 + for (NetworkIF networkIF : networkIFList) { + // 计算网速 + NetworkInterfaceSpeedCallable speedCallable = new NetworkInterfaceSpeedCallable(networkIF); + Future netSpeedFuture = AgentCommonThreadPool.pool.submit(speedCallable); + + // 构建 NetworkInfo对象 + NetworkInfo singleNetworkInfo = NetworkInfo.mapFromNetworkIF(networkIF); + + try { + String[] networkSpeedList = netSpeedFuture.get(5, + TimeUnit.SECONDS); + // 注入网速信息 + singleNetworkInfo.setSendSpeed(networkSpeedList[0]); + singleNetworkInfo.setRecvSpeed(networkSpeedList[1]); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + + if (updatePublicIpAddr) { + // 注入公网IP地址信息 + singleNetworkInfo.setPublicIpv4Addr(publicIpAddress[0]); + singleNetworkInfo.setPublicIpv6Addr(publicIpAddress[1]); + } + + // 加入单个网卡的信息 + result.add(singleNetworkInfo); + } + + + return result; } + /** * @return 第一位:公网Ipv4 第二位:公网Ipv6 */ @@ -95,9 +130,7 @@ public class NetworkStatusExecutor { return ipAddr; - } catch (IOException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { + } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } } diff --git a/agent/src/main/java/io/wdd/agent/executor/CheckSingleAppStatusCallable.java b/agent/src/main/java/io/wdd/agent/executor/status/SingleAppStatusCallable.java similarity index 93% rename from agent/src/main/java/io/wdd/agent/executor/CheckSingleAppStatusCallable.java rename to agent/src/main/java/io/wdd/agent/executor/status/SingleAppStatusCallable.java index 9d6b9e4..85b6be5 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CheckSingleAppStatusCallable.java +++ b/agent/src/main/java/io/wdd/agent/executor/status/SingleAppStatusCallable.java @@ -1,4 +1,4 @@ -package io.wdd.agent.executor; +package io.wdd.agent.executor.status; import io.wdd.agent.executor.config.CommandPipelineBuilder; import io.wdd.common.beans.status.AppStatusEnum; @@ -18,12 +18,12 @@ import static io.wdd.agent.executor.status.AppStatusExecutor.ALL_APP_NEED_TO_MON @Slf4j -public class CheckSingleAppStatusCallable implements Callable { +public class SingleAppStatusCallable implements Callable { private final ArrayList> commandList; private final String appName; - public CheckSingleAppStatusCallable(String appName, ArrayList> commandList) { + public SingleAppStatusCallable(String appName, ArrayList> commandList) { this.commandList = commandList; this.appName = appName; } diff --git a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java index d429780..569853b 100644 --- a/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java +++ b/agent/src/main/java/io/wdd/agent/status/AgentStatusCollector.java @@ -5,8 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.agent.config.beans.init.AgentServerInfo; import io.wdd.agent.config.utils.AgentCommonThreadPool; import io.wdd.agent.executor.status.AppStatusExecutor; +import io.wdd.agent.executor.status.NetworkStatusExecutor; import io.wdd.common.beans.status.*; import io.wdd.common.utils.TimeUtils; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.connection.stream.StringRecord; @@ -36,33 +38,33 @@ public class AgentStatusCollector { private static final OperatingSystem os; private static final List AgentStatusCache = Collections.singletonList(new AgentStatus()); private static final long ReportInitDelay = 60000; + private static final long ReportFixedRate = 15000; - private String statusRedisStreamKey; - - static { systemInfo = new SystemInfo(); hardware = systemInfo.getHardware(); os = systemInfo.getOperatingSystem(); } - - @PostConstruct - private void generateStatusRedisStreamKey() { - statusRedisStreamKey = AgentStatus.getRedisStatusKey( agentServerInfo.getAgentTopicName()); - } - @Resource RedisTemplate redisTemplate; @Resource ObjectMapper objectMapper; @Resource AgentServerInfo agentServerInfo; - @Resource AppStatusExecutor appStatusExecutor; + @Resource + NetworkStatusExecutor networkStatusExecutor; + private String statusRedisStreamKey; + @PostConstruct + private void generateStatusRedisStreamKey() { + statusRedisStreamKey = AgentStatus.getRedisStatusKey(agentServerInfo.getAgentTopicName()); + } + + @SneakyThrows public AgentStatus collect() { AgentStatus agentStatus = AgentStatusCache.get(0); @@ -72,7 +74,8 @@ public class AgentStatusCollector { agentStatus.setAgentTopicName(agentServerInfo.getAgentTopicName()); /* CPU */ - agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(), 1000)); + agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(), + 1000)); /* Memory */ agentStatus.setMemoryInfo(MemoryInfo.build(hardware.getMemory())); @@ -81,7 +84,11 @@ public class AgentStatusCollector { agentStatus.setDiskStoreInfo(DiskInfo.mapFromDiskStore(hardware.getDiskStores())); /* Network */ - agentStatus.setNetworkInfo(NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false))); + agentStatus.setNetworkInfo( +// NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false)) + networkStatusExecutor.collect(hardware.getNetworkIFs(false), + false) + ); /* operating system info */ agentStatus.setOsInfo(AgentSystemInfo.mapFromOHSISystem(os)); @@ -94,30 +101,22 @@ public class AgentStatusCollector { parseAppStatus(appStatusExecutor.checkAppStatus(true)) ); + // time sleep wait for status collect + // like network speed calculate + TimeUnit.SECONDS.sleep(3); + return agentStatus; } private AppStatusInfo parseAppStatus(HashMap> checkAppStatus) { return AppStatusInfo.builder() - .Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName())) - .Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName())) - .NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName())) - .build(); + .Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName())) + .Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName())) + .NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName())) + .build(); } - /** - * when server first time boot up - * the server info are not collected completely - * this will be executed to update or complete the octopus agent server info - */ -// @Scheduled(initialDelay = 180000) -// public void updateAgentServerInfo(){ -// -// -// -// } - // agent boot up 120s then start to report its status // at the fix rate of 15s /*@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)*/ @@ -125,12 +124,17 @@ public class AgentStatusCollector { try { - Map map = Map.of(TimeUtils.currentTimeString(), objectMapper.writeValueAsString(collect())); + Map map = Map.of(TimeUtils.currentTimeString(), + objectMapper.writeValueAsString(collect())); - StringRecord stringRecord = StreamRecords.string(map).withStreamKey(statusRedisStreamKey); + StringRecord stringRecord = StreamRecords.string(map) + .withStreamKey(statusRedisStreamKey); - log.debug("Agent Status is ==> {}", map); - redisTemplate.opsForStream().add(stringRecord); + log.debug("Agent Status is ==> {}", + map); + + redisTemplate.opsForStream() + .add(stringRecord); } catch (JsonProcessingException e) { throw new RuntimeException(e); @@ -139,8 +143,8 @@ public class AgentStatusCollector { } /** - * 接收来自 OMHandlerStatus 的调用 - * 汇报服务器的状态信息 + * 接收来自 OMHandlerStatus 的调用 + * 汇报服务器的状态信息 * * @param metricRepeatCount 需要重复的次数 * @param metricRepeatPinch 重复时间间隔 diff --git a/common/src/main/java/io/wdd/common/beans/status/NetworkInfo.java b/common/src/main/java/io/wdd/common/beans/status/NetworkInfo.java index 4b6ffd8..71a93ff 100644 --- a/common/src/main/java/io/wdd/common/beans/status/NetworkInfo.java +++ b/common/src/main/java/io/wdd/common/beans/status/NetworkInfo.java @@ -23,9 +23,9 @@ public class NetworkInfo { private String macAddr; private String mtu; private String[] innerIpv4Addr; - private String[] publicIpv4Addr; + private String publicIpv4Addr; private String[] innerIpv6Addr; - private String[] publicIpv6Addr; + private String publicIpv6Addr; /** * 过去1s内的网速,接收速率 */ @@ -37,6 +37,28 @@ public class NetworkInfo { private String trafficRecv; private String trafficSend; + public static NetworkInfo mapFromNetworkIF(NetworkIF networkIF) { + + return NetworkInfo + .builder() + .name(networkIF.getName()) + .displayName(networkIF.getDisplayName()) + .mtu(String.valueOf(networkIF.getMTU())) + .macAddr(networkIF.getMacaddr()) + .innerIpv4Addr(generateIPDICRFromNetworkIFList( + networkIF, + 4 + )) + .innerIpv6Addr(generateIPDICRFromNetworkIFList( + networkIF, + 6 + )) + .trafficSend(FormatUtils.formatData(networkIF.getBytesSent())) + .trafficRecv(FormatUtils.formatData(networkIF.getBytesRecv()) + ) + .build(); + + } public static List mapFromNetworkIFS(List networkIFList) { return networkIFList