[ agent ] [ status ]- fix some bug
This commit is contained in:
@@ -3,6 +3,7 @@ package io.wdd.agent.config.message.handler;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.agent.config.utils.AgentCommonThreadPool;
|
||||
import io.wdd.agent.status.AgentStatusCollector;
|
||||
import io.wdd.agent.status.HealthyReporter;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
@@ -12,6 +13,8 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import static io.wdd.common.beans.status.OctopusStatusMessage.*;
|
||||
|
||||
@Component
|
||||
@@ -26,6 +29,9 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
||||
@Resource
|
||||
AgentStatusCollector agentStatusCollector;
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
|
||||
@@ -46,13 +52,30 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
||||
healthyReporter.report();
|
||||
} else if (statusType.equals(ALL_STATUS_MESSAGE_TYPE)) {
|
||||
// all status report
|
||||
agentStatusCollector.sendAgentStatusToRedis();
|
||||
CompletableFuture.runAsync(
|
||||
() -> {
|
||||
agentStatusCollector.collect(
|
||||
1,
|
||||
1
|
||||
);
|
||||
}
|
||||
,
|
||||
AgentCommonThreadPool.pool
|
||||
);
|
||||
|
||||
} else if (statusType.equals(METRIC_STATUS_MESSAGE_TYPE)) {
|
||||
// metric status
|
||||
|
||||
agentStatusCollector.collect(
|
||||
statusMessage.getMetricRepeatCount(),
|
||||
statusMessage.getMetricRepeatPinch()
|
||||
// 必须要交给异步任务完成,否则会阻塞主线程!
|
||||
CompletableFuture.runAsync(
|
||||
() -> {
|
||||
agentStatusCollector.collect(
|
||||
statusMessage.getMetricRepeatCount(),
|
||||
statusMessage.getMetricRepeatPinch()
|
||||
);
|
||||
}
|
||||
,
|
||||
AgentCommonThreadPool.pool
|
||||
);
|
||||
} else if (statusType.equals(APP_STATUS_MESSAGE_TYPE)) {
|
||||
// app status report
|
||||
|
||||
@@ -18,7 +18,6 @@ import oshi.SystemInfo;
|
||||
import oshi.hardware.HardwareAbstractionLayer;
|
||||
import oshi.software.os.OperatingSystem;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -56,12 +55,6 @@ public class AgentStatusCollector {
|
||||
AppStatusExecutor appStatusExecutor;
|
||||
@Resource
|
||||
NetworkStatusExecutor networkStatusExecutor;
|
||||
private String statusRedisStreamKey;
|
||||
|
||||
@PostConstruct
|
||||
private void generateStatusRedisStreamKey() {
|
||||
statusRedisStreamKey = AgentStatus.getRedisStatusKey(agentServerInfo.getAgentTopicName());
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public AgentStatus collect() {
|
||||
@@ -73,8 +66,10 @@ public class AgentStatusCollector {
|
||||
agentStatus.setAgentTopicName(agentServerInfo.getAgentTopicName());
|
||||
|
||||
/* CPU */
|
||||
agentStatus.setCpuInfo(new CpuInfo(hardware.getProcessor(),
|
||||
1000));
|
||||
agentStatus.setCpuInfo(new CpuInfo(
|
||||
hardware.getProcessor(),
|
||||
1000
|
||||
));
|
||||
|
||||
/* Memory */
|
||||
agentStatus.setMemoryInfo(MemoryInfo.build(hardware.getMemory()));
|
||||
@@ -85,8 +80,10 @@ public class AgentStatusCollector {
|
||||
/* Network */
|
||||
agentStatus.setNetworkInfo(
|
||||
// NetworkInfo.mapFromNetworkIFS(hardware.getNetworkIFs(false))
|
||||
networkStatusExecutor.collect(hardware.getNetworkIFs(false),
|
||||
false)
|
||||
networkStatusExecutor.collect(
|
||||
hardware.getNetworkIFs(false),
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
/* operating system info */
|
||||
@@ -109,31 +106,40 @@ public class AgentStatusCollector {
|
||||
|
||||
private AppStatusInfo parseAppStatus(HashMap<String, Set<String>> checkAppStatus) {
|
||||
|
||||
return AppStatusInfo.builder()
|
||||
.Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName()))
|
||||
.Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName()))
|
||||
.NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName()))
|
||||
.build();
|
||||
return AppStatusInfo
|
||||
.builder()
|
||||
.Healthy(checkAppStatus.get(AppStatusEnum.HEALTHY.getName()))
|
||||
.Failure(checkAppStatus.get(AppStatusEnum.FAILURE.getName()))
|
||||
.NotInstall(checkAppStatus.get(AppStatusEnum.NOT_INSTALL.getName()))
|
||||
.build();
|
||||
}
|
||||
|
||||
// agent boot up 120s then start to report its status
|
||||
// at the fix rate of 15s
|
||||
/*@Scheduled(initialDelay = ReportInitDelay, fixedRate = ReportFixedRate)*/
|
||||
public void sendAgentStatusToRedis() {
|
||||
public void sendAgentStatusToRedis(String statusRedisStreamKey) {
|
||||
|
||||
try {
|
||||
|
||||
Map<String, String> map = Map.of(TimeUtils.currentTimeString(),
|
||||
objectMapper.writeValueAsString(collect()));
|
||||
// 装配 Agent状态 信息
|
||||
Map<String, String> map = Map.of(
|
||||
TimeUtils.currentTimeString(),
|
||||
objectMapper.writeValueAsString(collect())
|
||||
);
|
||||
|
||||
StringRecord stringRecord = StreamRecords.string(map)
|
||||
.withStreamKey(statusRedisStreamKey);
|
||||
StringRecord stringRecord = StreamRecords
|
||||
.string(map)
|
||||
.withStreamKey(statusRedisStreamKey);
|
||||
|
||||
log.debug("Agent Status is ==> {}",
|
||||
map);
|
||||
log.debug(
|
||||
"Agent Status is ==> {}",
|
||||
map
|
||||
);
|
||||
|
||||
redisTemplate.opsForStream()
|
||||
.add(stringRecord);
|
||||
// 发送到Redis
|
||||
redisTemplate
|
||||
.opsForStream()
|
||||
.add(stringRecord);
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -150,13 +156,18 @@ public class AgentStatusCollector {
|
||||
*/
|
||||
public void collect(int metricRepeatCount, int metricRepeatPinch) {
|
||||
|
||||
String statusRedisStreamKey = AgentStatus
|
||||
.getRedisStatusKey(agentServerInfo.getAgentTopicName());
|
||||
|
||||
log.debug("statusRedisStreamKey is => {}", statusRedisStreamKey);
|
||||
|
||||
for (int count = 0; count < metricRepeatCount; count++) {
|
||||
|
||||
try {
|
||||
|
||||
// use async thread pool to call the status collect method
|
||||
AgentCommonThreadPool.pool.submit(
|
||||
() -> this.sendAgentStatusToRedis()
|
||||
() -> this.sendAgentStatusToRedis(statusRedisStreamKey)
|
||||
);
|
||||
|
||||
// main thread sleep for metricRepeatPinch
|
||||
|
||||
@@ -25,7 +25,7 @@ public class OSHITest {
|
||||
|
||||
@Test
|
||||
void testCollect(){
|
||||
agentStatusCollector.sendAgentStatusToRedis();
|
||||
agentStatusCollector.sendAgentStatusToRedis("123");
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -72,9 +72,11 @@ mybatis-plus:
|
||||
logicDeleteField: isDelete
|
||||
logic-not-delete-value: 0
|
||||
logic-delete-value: 1
|
||||
banner: false
|
||||
configuration:
|
||||
# 希望知道所有的sql是怎么执行的, 配置输出日志
|
||||
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
|
||||
|
||||
# 数据库下划线--实体类也是下划线 需要为false
|
||||
map-underscore-to-camel-case: true
|
||||
# 一级缓存的 缓存级别默认为 session,如果要关闭一级缓存可以设置为 statement
|
||||
|
||||
@@ -154,5 +154,13 @@ apt-cache madison openjdk-11-jdk | head -n 1 | awk '{print$3}'
|
||||
|
||||
java -jar /octopus-agent/agent.jar -Xms128m -Xmx512m -Dfile.encoding=utf-8 --spring.profiles.active=k3s --spring.cloud.nacos.config.group=k3s --spring.cloud.nacos.config.extension-configs[0].dataId=common-k3s.yaml --spring.cloud.nacos.config.extension-configs[0].group=k3s
|
||||
|
||||
|
||||
export OctopusServerContainerName="octopus-server"
|
||||
|
||||
docker container stop ${OctopusServerContainerName}
|
||||
sleep 2
|
||||
docker container rm ${OctopusServerContainerName}
|
||||
docker image rmi icederce/wdd-octopus-server:latest
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user