[ Service ] [ Executor ] 重构Executor部分

This commit is contained in:
zeaslity
2023-08-11 17:06:53 +08:00
parent 0a78f9a02b
commit a2b6b01fd3
11 changed files with 812 additions and 1061 deletions

View File

@@ -1,6 +1,7 @@
package io.wdd.common.config;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -32,6 +33,39 @@ public class OctopusObjectMapperConfig {
public static ObjectMapper OctopusObjectMapper = null;
public static String WriteToString(Object data) {
try {
return OctopusObjectMapper.writeValueAsString(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static String WriteToStringPretty(Object data) {
try {
return OctopusObjectMapper
.writerWithDefaultPrettyPrinter()
.writeValueAsString(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static byte[] WriteToBytes(Object data) {
try {
return OctopusObjectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@PostConstruct
public void setOctopusObjectMapper() {

View File

@@ -1,420 +1,419 @@
package io.wdd.rpc.agent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.LATEST_VERSION;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
//@Service
@Slf4j
public class OctopusAgentServiceImpl implements OctopusAgentService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT;
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Override
public Map<String, String> getAllAgentVersion() {
HashMap<String, String> result = new HashMap<>();
// 查询获取到最新的Agent版本信息
result.put(
"LATEST_VERSION",
getRealAgentLatestVersion()
);
// 获取Agent的版本信息 -- 超时时间
// 从OctopusToServer中收集到所有Agent的版本信息
// 组装信息至集合中
LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 发送OctopusMessage-Agent
buildOMessageAndSendToAllHealthyAgent(
AgentOperationType.VERSION,
currentTime
);
// 构造 异步结果监听内容
OctopusMessageSyncReplayContend agentReplayContend = OctopusMessageSyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType,
currentTime
);
CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息
asyncWaitOctopusMessageResultService.waitFor(agentReplayContend);
boolean isAllHealthyAgentReport = false;
try {
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
isAllHealthyAgentReport = countDownLatch.await(
5,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
} finally {
// 超时,或者 全部信息已经收集
if (!isAllHealthyAgentReport) {
log.warn("存在部分Agent没有上报 版本信息!");
}
// 此处调用,即可中断 异步任务的收集工作
asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend);
// 处理结果
agentReplayContend
.getReplayOMList()
.stream()
.forEach(
mMessage -> {
result.put(
mMessage.getUuid(),
(String) mMessage.getResult()
);
}
);
// help gc
agentReplayContend = null;
}
return result;
}
private String getRealAgentLatestVersion() {
String latestVersion = (String) redisTemplate
.opsForValue()
.get(
LATEST_VERSION
);
return latestVersion;
}
@Override
public Map<String, ServerInfoVO> getAllAgentCoreInfo() {
HashMap<String, ServerInfoVO> result = new HashMap<>();
// 获取Agent的版本信息 -- 超时时间
// 从OctopusToServer中收集到所有Agent的版本信息
// 组装信息至集合中
LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 发送OctopusMessage-Agent
buildOMessageAndSendToAllHealthyAgent(
AgentOperationType.INFO,
currentTime
);
// 构造结果
OctopusMessageSyncReplayContend octopusMessageSyncReplayContend = OctopusMessageSyncReplayContend.build(
ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
CurrentAppOctopusMessageType,
currentTime
);
CountDownLatch countDownLatch = octopusMessageSyncReplayContend.getCountDownLatch();
// 调用后台接收处理所有的Replay信息
asyncWaitOctopusMessageResultService.waitFor(octopusMessageSyncReplayContend);
/* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
result,
currentTime,
countDownLatch
);
*/
try {
// 超时等待5秒钟, 或者所有的Agent均已经完成上报ddo
countDownLatch.await(
5,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
log.warn("存在部分Agent没有上报 核心信息!");
} finally {
// 超时,或者 全部信息已经收集
// 此处调用,即可中断 异步任务的收集工作
asyncWaitOctopusMessageResultService.stopWaiting(octopusMessageSyncReplayContend);
// 处理结果
octopusMessageSyncReplayContend
.getReplayOMList()
.stream()
.forEach(
mMessage -> {
try {
// 解析当前信息
ServerInfoVO serverInfoVO = objectMapper.readValue(
(String) mMessage.getResult(),
new TypeReference<ServerInfoVO>() {
}
);
result.put(
mMessage.getUuid(),
serverInfoVO
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
);
// help gc
octopusMessageSyncReplayContend = null;
}
return result;
}
@Override
public String shutdownAgentDanger(String agentTopicName) {
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
log.error("agentTopicName Error !");
return "agentTopicName Error!";
}
LocalDateTime formatTime = TimeUtils.currentFormatTime();
buildOMessageAndSendToAllHealthyAgent(
AgentOperationType.SHUTDOWN,
formatTime
);
// 是否需要检查相应的状态
return null;
}
private CompletableFuture<Void> waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
ExecutorService pool = ServerCommonPool.pool;
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
return CompletableFuture.runAsync(
() -> {
while (true) {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// 开始收集等待 200ms
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回,继续死循环
continue;
}
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 获取到OM
// 判断信息是否是需要的类型
// 根据 init_time + OctopusMessageType + AgentOperationMessage
if (!judgyIsCurrentServerReplyMessage(
message,
startTime
)) {
// 不是当前应用需要的的OM将信息放置与Cache队列的末尾
OCTOPUS_MESSAGE_FROM_AGENT.add(message);
// 返回,继续死循环
continue;
}
// 是当前需要的消息信息
result.put(
message.getUuid(),
(String) message.getResult()
);
countDownLatch.countDown();
}
},
pool
);
}
private CompletableFuture<Void> waitCollectAllAgentCoreInfo(HashMap<String, ServerInfoVO> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
ExecutorService pool = ServerCommonPool.pool;
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
return CompletableFuture.runAsync(
() -> {
while (true) {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// 开始收集等待 200ms
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回,继续死循环
continue;
}
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 获取到OM
// 判断信息是否是需要的类型
// 根据 init_time + OctopusMessageType + AgentOperationMessage
if (!judgyIsCurrentServerReplyMessage(
message,
startTime
)) {
// 不是当前应用需要的的OM将信息放置与Cache队列的末尾
OCTOPUS_MESSAGE_FROM_AGENT.offer(message);
// 返回,继续死循环
continue;
}
// 是当前需要的消息信息
try {
// 解析当前信息
ServerInfoVO serverInfoVO = objectMapper.readValue(
(String) message.getResult(),
new TypeReference<ServerInfoVO>() {
}
);
result.put(
message.getUuid(),
serverInfoVO
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
}
},
pool
);
}
/**
* 判断信息是否是需要的类型
* 根据 init_time + OctopusMessageType + AgentOperationMessage
*
* @return
*/
private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) {
// init_time 时间判断
boolean startTimeEqual = startTime.isEqual(message.getInit_time());
// OctopusMessageType判断
boolean OMTypeEqual = message
.getOctopusMessageType()
.equals(CurrentAppOctopusMessageType);
return startTimeEqual && OMTypeEqual;
}
private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) {
List<OctopusMessage> octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName ->
ConstructAgentOperationMessage(
agentTopicName,
operationType,
currentTime
)
)
.collect(Collectors.toList());
// 发送相应的消息
oMessageToAgentSender.send(octopusMessageList);
}
/**
* 专门构造 Agent 类型的 OctopusMessage
* 通常只能在此类中使用
*
* @param agentTopicName
* @param operationType
* @param currentTime
* @return
*/
private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) {
AgentOperationMessage operationMessage = AgentOperationMessage
.builder()
.opType(operationType)
.build();
String ops;
try {
ops = objectMapper.writeValueAsString(operationMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.octopusMessageType(CurrentAppOctopusMessageType)
.uuid(agentTopicName)
.init_time(currentTime)
.content(ops)
.build();
}
}
//package io.wdd.rpc.agent;
//
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.core.type.TypeReference;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import io.wdd.common.utils.TimeUtils;
//import io.wdd.rpc.message.OctopusMessage;
//import io.wdd.rpc.message.OctopusMessageType;
//import io.wdd.rpc.message.handler.OMessageReplayContent;
//import io.wdd.rpc.message.sender.OMessageToAgentSender;
//import io.wdd.server.beans.vo.ServerInfoVO;
//import io.wdd.server.config.ServerCommonPool;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.core.RedisTemplate;
//
//import javax.annotation.Resource;
//import java.time.LocalDateTime;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//import java.util.concurrent.CompletableFuture;
//import java.util.concurrent.CountDownLatch;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.TimeUnit;
//import java.util.stream.Collectors;
//
//import static io.wdd.rpc.message.handler.OMessageToServerListener.LATEST_VERSION;
//import static io.wdd.rpc.message.handler.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
//import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
//import static io.wdd.rpc.status.CommonAndStatusCache.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
//
////@Service
//@Slf4j
//public class OctopusAgentServiceImpl implements OctopusAgentService {
//
// private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT;
// @Resource
// OMessageToAgentSender oMessageToAgentSender;
// @Resource
// ObjectMapper objectMapper;
//
// @Resource
// RedisTemplate redisTemplate;
//
// @Resource
// AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
//
// @Override
// public Map<String, String> getAllAgentVersion() {
// HashMap<String, String> result = new HashMap<>();
//
// // 查询获取到最新的Agent版本信息
// result.put(
// "LATEST_VERSION",
// getRealAgentLatestVersion()
// );
//
// // 获取Agent的版本信息 -- 超时时间
//
// // 从OctopusToServer中收集到所有Agent的版本信息
// // 组装信息至集合中
// LocalDateTime currentTime = TimeUtils.currentFormatTime();
//
// // 发送OctopusMessage-Agent
// buildOMessageAndSendToAllHealthyAgent(
// AgentOperationType.VERSION,
// currentTime
// );
//
// // 构造 异步结果监听内容
// OMessageReplayContent agentReplayContend = OMessageReplayContent.build(
// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
// CurrentAppOctopusMessageType,
// currentTime
// );
//
// CountDownLatch countDownLatch = agentReplayContend.getCountDownLatch();
//
// // 调用后台接收处理所有的Replay信息
// asyncWaitOctopusMessageResultService.waitFor(agentReplayContend);
//
// boolean isAllHealthyAgentReport = false;
// try {
// // 超时等待5秒钟, 或者所有的Agent均已经完成上报
// isAllHealthyAgentReport = countDownLatch.await(
// 5,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
//
// } finally {
// // 超时,或者 全部信息已经收集
// if (!isAllHealthyAgentReport) {
// log.warn("存在部分Agent没有上报 版本信息!");
// }
//
// // 此处调用,即可中断 异步任务的收集工作
// asyncWaitOctopusMessageResultService.stopWaiting(agentReplayContend);
//
// // 处理结果
// agentReplayContend
// .getReplayOMList()
// .stream()
// .forEach(
// mMessage -> {
// result.put(
// mMessage.getUuid(),
// (String) mMessage.getResult()
// );
// }
// );
//
// // help gc
// agentReplayContend = null;
// }
//
// return result;
// }
//
// private String getRealAgentLatestVersion() {
//
// String latestVersion = (String) redisTemplate
// .opsForValue()
// .get(
// LATEST_VERSION
// );
//
// return latestVersion;
// }
//
// @Override
// public Map<String, ServerInfoVO> getAllAgentCoreInfo() {
//
// HashMap<String, ServerInfoVO> result = new HashMap<>();
//
// // 获取Agent的版本信息 -- 超时时间
// // 从OctopusToServer中收集到所有Agent的版本信息
// // 组装信息至集合中
// LocalDateTime currentTime = TimeUtils.currentFormatTime();
//
// // 发送OctopusMessage-Agent
// buildOMessageAndSendToAllHealthyAgent(
// AgentOperationType.INFO,
// currentTime
// );
//
// // 构造结果
// OMessageReplayContent oMessageReplayContent = OMessageReplayContent.build(
// ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size(),
// CurrentAppOctopusMessageType,
// currentTime
// );
//
// CountDownLatch countDownLatch = oMessageReplayContent.getCountDownLatch();
//
// // 调用后台接收处理所有的Replay信息
// asyncWaitOctopusMessageResultService.waitFor(oMessageReplayContent);
//
// /* CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
// result,
// currentTime,
// countDownLatch
// );
//*/
// try {
// // 超时等待5秒钟, 或者所有的Agent均已经完成上报ddo
// countDownLatch.await(
// 5,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
// log.warn("存在部分Agent没有上报 核心信息!");
// } finally {
// // 超时,或者 全部信息已经收集
//
// // 此处调用,即可中断 异步任务的收集工作
// asyncWaitOctopusMessageResultService.stopWaiting(oMessageReplayContent);
//
// // 处理结果
// oMessageReplayContent
// .getReplayOMList()
// .stream()
// .forEach(
// mMessage -> {
//
// try {
//
// // 解析当前信息
// ServerInfoVO serverInfoVO = objectMapper.readValue(
// (String) mMessage.getResult(),
// new TypeReference<ServerInfoVO>() {
// }
// );
//
// result.put(
// mMessage.getUuid(),
// serverInfoVO
// );
//
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
//
// }
// );
//
// // help gc
// oMessageReplayContent = null;
// }
//
// return result;
// }
//
//
// @Override
// public String shutdownAgentDanger(String agentTopicName) {
//
// if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
// log.error("agentTopicName Error !");
// return "agentTopicName Error!";
// }
//
// LocalDateTime formatTime = TimeUtils.currentFormatTime();
// buildOMessageAndSendToAllHealthyAgent(
// AgentOperationType.SHUTDOWN,
// formatTime
// );
//
// // 是否需要检查相应的状态
//
//
// return null;
// }
//
// private CompletableFuture<Void> waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
//
// ExecutorService pool = ServerCommonPool.pool;
//
// // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
// return CompletableFuture.runAsync(
// () -> {
// while (true) {
// if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// // 开始收集等待 200ms
// try {
// TimeUnit.MILLISECONDS.sleep(50);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// // 返回,继续死循环
// continue;
// }
//
// OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
//
// // 获取到OM
// // 判断信息是否是需要的类型
// // 根据 init_time + OctopusMessageType + AgentOperationMessage
// if (!judgyIsCurrentServerReplyMessage(
// message,
// startTime
// )) {
//
// // 不是当前应用需要的的OM将信息放置与Cache队列的末尾
//
// OCTOPUS_MESSAGE_FROM_AGENT.add(message);
//
// // 返回,继续死循环
// continue;
// }
//
// // 是当前需要的消息信息
// result.put(
// message.getUuid(),
// (String) message.getResult()
// );
// countDownLatch.countDown();
// }
// },
// pool
// );
//
//
// }
//
// private CompletableFuture<Void> waitCollectAllAgentCoreInfo(HashMap<String, ServerInfoVO> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
//
// ExecutorService pool = ServerCommonPool.pool;
//
// // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
// return CompletableFuture.runAsync(
// () -> {
// while (true) {
// if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// // 开始收集等待 200ms
// try {
// TimeUnit.MILLISECONDS.sleep(50);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// // 返回,继续死循环
// continue;
// }
//
// OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
//
// // 获取到OM
// // 判断信息是否是需要的类型
// // 根据 init_time + OctopusMessageType + AgentOperationMessage
// if (!judgyIsCurrentServerReplyMessage(
// message,
// startTime
// )) {
//
// // 不是当前应用需要的的OM将信息放置与Cache队列的末尾
//
// OCTOPUS_MESSAGE_FROM_AGENT.offer(message);
//
// // 返回,继续死循环
// continue;
// }
//
// // 是当前需要的消息信息
// try {
//
// // 解析当前信息
// ServerInfoVO serverInfoVO = objectMapper.readValue(
// (String) message.getResult(),
// new TypeReference<ServerInfoVO>() {
// }
// );
//
// result.put(
// message.getUuid(),
// serverInfoVO
// );
//
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
//
// countDownLatch.countDown();
// }
// },
// pool
// );
//
// }
//
// /**
// * 判断信息是否是需要的类型
// * 根据 init_time + OctopusMessageType + AgentOperationMessage
// *
// * @return
// */
// private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) {
//
// // init_time 时间判断
// boolean startTimeEqual = startTime.isEqual(message.getInit_time());
//
// // OctopusMessageType判断
// boolean OMTypeEqual = message
// .getOctopusMessageType()
// .equals(CurrentAppOctopusMessageType);
//
// return startTimeEqual && OMTypeEqual;
// }
//
// private void buildOMessageAndSendToAllHealthyAgent(AgentOperationType operationType, LocalDateTime currentTime) {
//
// List<OctopusMessage> octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
// .stream()
// .map(
// agentTopicName ->
// ConstructAgentOperationMessage(
// agentTopicName,
// operationType,
// currentTime
// )
// )
// .collect(Collectors.toList());
//
// // 发送相应的消息
// oMessageToAgentSender.send(octopusMessageList);
// }
//
// /**
// * 专门构造 Agent 类型的 OctopusMessage
// * 通常只能在此类中使用
// *
// * @param agentTopicName
// * @param operationType
// * @param currentTime
// * @return
// */
// private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) {
//
// AgentOperationMessage operationMessage = AgentOperationMessage
// .builder()
// .opType(operationType)
// .build();
//
// String ops;
// try {
// ops = objectMapper.writeValueAsString(operationMessage);
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
//
// return OctopusMessage
// .builder()
// .octopusMessageType(CurrentAppOctopusMessageType)
// .uuid(agentTopicName)
// .init_time(currentTime)
// .content(ops)
// .build();
//
// }
//}

View File

@@ -3,6 +3,7 @@ package io.wdd.rpc.execute;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.OMessageReplayContent;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -11,9 +12,10 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.StopWaitingResult;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.WaitFromAgent;
import static io.wdd.rpc.message.handler.OMessageHandler.*;
import static io.wdd.rpc.status.CommonAndStatusCache.ALL_AGENT_TOPIC_NAME_SET;
@Service
@@ -53,41 +55,53 @@ public class ExecutionServiceImpl implements ExecutionService {
// send the message
oMessageToAgentSender.send(octopusMessage);
System.out.println("originOctopusMessage = " + octopusMessage.hashCode());
log.debug(
"发送的 matchKey为 {}, 内容为 {}",
GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
),
octopusMessage
);
// 需要返回结果
if (!durationTask) {
// 等待结果
WaitFromAgent(octopusMessage);
OMessageReplayContent replayContent = WaitFromAgent(
octopusMessage,
1
);
synchronized (octopusMessage) {
CountDownLatch replayLatch = replayContent.getCountDownLatch();
boolean waitOK = false;
try {
octopusMessage.wait(10000);
log.debug("等待结束!");
waitOK = replayLatch.await(
10,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} finally {
// 释放等待队列
StopWaitingResult(octopusMessage);
}
// 转换结果
commandResultLog = (ArrayList<String>) octopusMessage.getResult();
// debug
log.debug(
"执行命令 {} 结果为 {} 内容为 {}",
"执行命令 {} {} 在规定时间内结束, 结果为 {} 返回内容为 {}",
executionMessage.getSingleLineCommand() == null ? executionMessage.getMultiLineCommand() : executionMessage.getSingleLineCommand(),
waitOK ? "已经" : "",
octopusMessage.getResultCode(),
octopusMessage.getResult()
);
}
// 释放等待队列
StopWaitingResult(octopusMessage);
}
return commandResultLog;
}
@@ -98,7 +112,7 @@ public class ExecutionServiceImpl implements ExecutionService {
return OctopusMessage
.builder()
.octopusMessageType(OctopusMessageType.EXECUTOR)
.init_time(TimeUtils.currentTime())
.init_time(TimeUtils.currentFormatTime())
.uuid(agentTopicName)
.content(
executionMessage

View File

@@ -1,47 +0,0 @@
package io.wdd.rpc.execute.service;
import java.util.ArrayList;
import java.util.List;
/**
* 同步命令执行的核心类
* 需要等待命令执行完毕,完后返回相应的结果
*/
public interface SyncExecutionService {
List<ArrayList<String>> SyncSendCommandToAgentComplete(
List<String> agentTopicNameList,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
/**
* 调用 完整脚本的 最底层函数
*
* @param agentTopicName
* @param type
* @param funcContent
* @param commandList
* @param commandListComplete
* @param futureKey
* @param durationTask
* @return resultKey 本次操作在Redis中记录的结果Key
*/
ArrayList<String> SyncSendCommandToAgentComplete(
String agentTopicName,
String type,
List<String> funcContent,
List<String> commandList,
List<List<String>> commandListComplete,
boolean needResultReplay,
String futureKey,
boolean durationTask
);
}

View File

@@ -1,135 +0,0 @@
package io.wdd.rpc.execute.service;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
//@Service
@Slf4j
public class SyncExecutionServiceImpl implements SyncExecutionService {
private static final boolean COMMAND_EXEC_NEED_REPLAY = true;
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.EXECUTOR;
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Resource
AsyncExecutionService asyncExecutionService;
/**
* 一个命令执行的最长等待时间
*/
int processMaxWaitSeconds = 10;
@Override
public List<ArrayList<String>> SyncSendCommandToAgentComplete(List<String> agentTopicNameList, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
return agentTopicNameList
.stream()
.map(
agentTopicName -> {
return this.SyncSendCommandToAgentComplete(
agentTopicName,
type,
null,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
}
)
.collect(Collectors.toList());
}
@Override
public ArrayList<String> SyncSendCommandToAgentComplete(String agentTopicName, String type, List<String> funcContent, List<String> commandList, List<List<String>> commandListComplete, boolean needResultReplay, String futureKey, boolean durationTask) {
// 异步访问
OctopusMessage octopusMessage = asyncExecutionService.AsyncCallSendCommandToAgent(
agentTopicName,
type,
null,
commandList,
commandListComplete,
needResultReplay,
futureKey,
durationTask
);
LocalDateTime initTime = octopusMessage.getInit_time();
// OM 中的result保存
ArrayList<String> result = new ArrayList<>();
// 构造消息等待对象
int commandCount = 1;
if (null != commandListComplete) {
commandCount = Math.max(
commandListComplete.size(),
1
);
}
// 构造回复信息的内容
OctopusMessageSyncReplayContend executionReplayContent = OctopusMessageSyncReplayContend.build(
commandCount,
CurrentAppOctopusMessageType,
initTime
);
CountDownLatch countDownLatch = executionReplayContent.getCountDownLatch();
// 开始等待结果
asyncWaitOctopusMessageResultService.waitFor(executionReplayContent);
// 监听结果
try {
boolean await = countDownLatch.await(
processMaxWaitSeconds,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 等待所有的结果返回
// 停止等待结果
asyncWaitOctopusMessageResultService.stopWaiting(executionReplayContent);
// 解析结果
executionReplayContent
.getReplayOMList()
.stream()
.map(
om -> {
log.debug(
"replay message is => {}",
om
);
return (ArrayList<String>) om.getResult();
}
)
.forEachOrdered(
singleResult -> result.addAll(singleResult)
);
}
// 返回 执行的结果
return result;
}
}

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler.sync;
package io.wdd.rpc.message.handler;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
@@ -11,8 +11,8 @@ import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
import static io.wdd.rpc.message.handler.OMessageToServerListener.FROM_AGENT_MATCH_TO_AGENT_MAP;
import static io.wdd.rpc.message.handler.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
@Service
@Slf4j(topic = "Octopus Message Handler")
@@ -44,18 +44,24 @@ public class OMessageHandler {
}
public static void WaitFromAgent(OctopusMessage octopusMessage) {
public static OMessageReplayContent WaitFromAgent(OctopusMessage originOMessage, int waitFroReplayNum) {
// 构建 MatchKey
String matchKey = GenerateOMessageMatchKey(
octopusMessage.getOctopusMessageType(),
octopusMessage.getInit_time()
originOMessage.getOctopusMessageType(),
originOMessage.getInit_time()
);
// 构造等待对象
OMessageReplayContent replayContent = OMessageReplayContent.build(
originOMessage,
waitFroReplayNum
);
// 开始等待
FROM_AGENT_MATCH_TO_AGENT_MAP.put(
matchKey,
octopusMessage
replayContent
);
//debug
@@ -63,6 +69,9 @@ public class OMessageHandler {
"wait from agent map is => {}",
FROM_AGENT_MATCH_TO_AGENT_MAP
);
return replayContent;
}
public static void StopWaitingResult(OctopusMessage octopusMessage) {
@@ -108,10 +117,16 @@ public class OMessageHandler {
replayOMessage.getInit_time()
);
log.info(
"接收到的 matchKey为 {}, 内容为 {}",
matchKey,
replayOMessage
);
if (!FROM_AGENT_MATCH_TO_AGENT_MAP.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处
log.debug(
log.warn(
"等待队列里面没有该回复的结果key =>",
matchKey
);
@@ -121,13 +136,16 @@ public class OMessageHandler {
// 归还信息
// 拿到原始信息
OctopusMessage originOMessage = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey);
OMessageReplayContent oMessageReplayContent = FROM_AGENT_MATCH_TO_AGENT_MAP.get(matchKey);
OctopusMessage originOMessage = oMessageReplayContent.getOriginOMessage();
originOMessage.setResultCode(replayOMessage.getResultCode());
originOMessage.setResult(replayOMessage.getResult());
// 通知等待线程
originOMessage.notify();
oMessageReplayContent
.getCountDownLatch()
.countDown();
}
}

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler.async;
package io.wdd.rpc.message.handler;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
@@ -11,20 +11,19 @@ import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey;
import static io.wdd.rpc.message.handler.OMessageHandler.GenerateOMessageMatchKey;
@Data
@AllArgsConstructor
@NoArgsConstructor
@SuperBuilder(toBuilder = true)
@ApiModel("众多业务调用RPC,异步等待需要确定返回消息是谁的")
public class OctopusMessageSyncReplayContend {
public class OMessageReplayContent {
@ApiModelProperty("rpc消息的类型")
OctopusMessageType type;
OctopusMessageType octopusMessageType;
@ApiModelProperty("rpc消息发送的时间, 精确匹配,去掉毫秒")
@JsonFormat(pattern = "yyyy-MM-dd hh:MM:ss")
@@ -36,31 +35,26 @@ public class OctopusMessageSyncReplayContend {
@ApiModelProperty("需要等待的消息个数")
CountDownLatch countDownLatch;
@ApiModelProperty("回复的结果列表, 临时保存")
ArrayList<OctopusMessage> replayOMList;
@ApiModelProperty("发送的OctopusMessage")
OctopusMessage originOMessage;
/**
* Execution模块使用的模板
*
* @return
*/
public static OctopusMessageSyncReplayContend build(int waitForReplayNum, OctopusMessageType currentOMType, LocalDateTime currentTime) {
public static OMessageReplayContent build(OctopusMessage originOMessage, int waitForReplayNum) {
CountDownLatch latch = null;
if (waitForReplayNum != 0) {
latch = new CountDownLatch(waitForReplayNum);
}
return new OctopusMessageSyncReplayContend(
currentOMType,
currentTime,
return new OMessageReplayContent(
originOMessage.getOctopusMessageType(),
originOMessage.getInit_time(),
GenerateOMessageMatchKey(
currentOMType,
currentTime
originOMessage.getOctopusMessageType(),
originOMessage.getInit_time()
),
latch,
new ArrayList<OctopusMessage>(16)
originOMessage
);
}

View File

@@ -1,4 +1,4 @@
package io.wdd.rpc.message.handler.sync;
package io.wdd.rpc.message.handler;
import io.wdd.common.handler.MyRuntimeException;
@@ -16,6 +16,7 @@ import java.util.ArrayDeque;
import java.util.HashMap;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.common.config.OctopusObjectMapperConfig.WriteToString;
@Configuration
@Slf4j(topic = "Octopus Message Listener")
@@ -39,10 +40,11 @@ public class OMessageToServerListener {
/**
* 发送出去的OctopusMessage需要和返回回来的内容对比
* 返回来的OM反序列化之后就不是原对象需要进行 通过MatchKey比较
* 2023年8月11日 修改为OMessageReplayContent
* <p>
* omMatchKey -- OctopusMessage
* omMatchKey -- OMessageReplayContent
*/
public static HashMap<String, OctopusMessage> FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>();
public static HashMap<String, OMessageReplayContent> FROM_AGENT_MATCH_TO_AGENT_MAP = new HashMap<>();
@Resource
RedisTemplate redisTemplate;
@@ -67,12 +69,11 @@ public class OMessageToServerListener {
}
// Octopus Message Handler
log.debug(
log.info(
"received from agent : {} ",
octopusMessage
WriteToString(octopusMessage)
);
System.out.println("receivedOctopusMessage = " + octopusMessage.hashCode());
// 获取Agent的版本信息
if (octopusMessage

View File

@@ -1,126 +0,0 @@
package io.wdd.rpc.message.handler.async;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static io.wdd.rpc.message.handler.sync.OMessageHandler.GenerateOMessageMatchKey;
import static io.wdd.rpc.message.handler.sync.OMessageToServerListener.OCTOPUS_MESSAGE_FROM_AGENT;
/**
* 从Agent收集返回信息的统一处理地点
* 使用方法: 业务类构造 OMReplayContend对象调用AsyncWaitOMResult.waitFor()方法
* <p>
* 调用结束之后,需要从 REPLAY_WAITING_TARGET 中移除此部分内容
*/
//@Service
@Slf4j
public class AsyncWaitOctopusMessageResultService {
@PostConstruct
public void daemonHandleReplayOMFromAgent() {
// 异步任务启动
CompletableFuture.runAsync(
() -> doHandleReplayOMFromAgent(),
ServerCommonPool.pool
);
}
/**
* 为了避免线程不安全的问题,增加一层缓存,仅仅由当前类操作此部分
* KEY -> replayMatchKey
* VALUE -> OctopusMessageSyncReplayContend - 包含countDownLatch 和 result
*/
private static final HashMap<String, OctopusMessageSyncReplayContend> OM_REPLAY_WAITING_TARGET_MAP = new HashMap<>();
/**
* 操作 OCTOPUS_MESSAGE_FROM_AGENT 获取相应的Message放入内容中
*/
private void doHandleReplayOMFromAgent() {
// 死循环,不断的轮询 OCTOPUS_MESSAGE_FROM_AGENT
while (true) {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// 开始收集等待 50 ms
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回,继续死循环
continue;
}
// 拿到消息
OctopusMessage replayOMessage = OCTOPUS_MESSAGE_FROM_AGENT.poll();
// 构造 replayMatchKey
String matchKey = GenerateOMessageMatchKey(
replayOMessage.getOctopusMessageType(),
replayOMessage.getInit_time()
);
if (!OM_REPLAY_WAITING_TARGET_MAP.containsKey(matchKey)) {
// 没有这个Key,说明等待结果已经超时了,直接丢弃,然后继续循环
// todo 错误的数据需要放置于某处
log.debug(
"等待队列里面没有该回复的结果key =>",
matchKey
);
continue;
}
// Map中包含有Key,那么放置进去
OctopusMessageSyncReplayContend replayContend = OM_REPLAY_WAITING_TARGET_MAP.get(matchKey);
replayContend
.getReplayOMList()
.add(replayOMessage);
// 需要操作countDown
replayContend
.getCountDownLatch()
.countDown();
// 结束操作,继续循环
}
}
public void waitFor(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) {
// 向 REPLAY_CACHE_MAP中写入 Key
OM_REPLAY_WAITING_TARGET_MAP.put(
OctopusMessageSyncReplayContend.getReplayMatchKey(),
OctopusMessageSyncReplayContend
);
// 在调用线程的countDownLunch结束之后,关闭
// 清除 REPLAY_CACHE_MAP 中的队列
}
public void stopWaiting(OctopusMessageSyncReplayContend OctopusMessageSyncReplayContend) {
// 在调用线程的countDownLunch结束之后,关闭 清除 REPLAY_CACHE_MAP 中的队列
OctopusMessageSyncReplayContend contend = OM_REPLAY_WAITING_TARGET_MAP.get(OctopusMessageSyncReplayContend.getReplayMatchKey());
// 移除该内容
OM_REPLAY_WAITING_TARGET_MAP.remove(OctopusMessageSyncReplayContend.getReplayMatchKey());
// help gc
contend = null;
}
}

View File

@@ -1,265 +1,265 @@
package io.wdd.rpc.status.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.beans.request.MetricQueryEntity;
import io.wdd.rpc.message.OctopusMessage;
import io.wdd.rpc.message.OctopusMessageType;
import io.wdd.rpc.message.handler.async.AsyncWaitOctopusMessageResultService;
import io.wdd.rpc.message.handler.async.OctopusMessageSyncReplayContend;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.rpc.status.beans.AgentStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
import static io.wdd.rpc.status.OctopusStatusMessage.*;
@Slf4j
//@Service
public class SyncStatusServiceImpl implements SyncStatusService {
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS;
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
AsyncWaitOctopusMessageResultService asyncWaitOctopusMessageResultService;
@Resource
RedisTemplate redisTemplate;
@Override
public Map<String, Boolean> SyncCollectAgentAliveStatus(List<String> agentTopicNameList, int aliveStatusWaitMaxTime) {
// 构造最后的结果Map
Map<String, Boolean> agentAliveStatusMap = agentTopicNameList
.stream()
.collect(
Collectors.toMap(
agentTopicName -> agentTopicName,
agentTopicName -> Boolean.FALSE
));
// 当前的时间
LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 构造OctopusMessage - StatusMessage结构体, 下发所有的消息
buildAndSendAgentStatusOctopusMessage(
agentTopicNameList,
HEALTHY_STATUS_MESSAGE_TYPE,
currentTime
);
// 同步收集消息
OctopusMessageSyncReplayContend statusSyncReplayContend = OctopusMessageSyncReplayContend.build(
agentTopicNameList.size(),
CurrentAppOctopusMessageType,
currentTime
);
asyncWaitOctopusMessageResultService.waitFor(statusSyncReplayContend);
// 解析结果
CountDownLatch countDownLatch = statusSyncReplayContend.getCountDownLatch();
// 等待状态返回的结果
boolean agentAliveStatusCollectResult = false;
try {
agentAliveStatusCollectResult = countDownLatch.await(
aliveStatusWaitMaxTime,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (!agentAliveStatusCollectResult) {
log.debug("Agent存活状态检查没有检查到全部的Agent");
}
// 移除等待队列
asyncWaitOctopusMessageResultService.stopWaiting(statusSyncReplayContend);
// 处理结果
statusSyncReplayContend
.getReplayOMList()
.stream()
.forEach(
statusOMessage -> {
if (statusOMessage.getResult() != null) {
agentAliveStatusMap.put(
statusOMessage.getUuid(),
Boolean.TRUE
);
}
}
);
}
// 返回Agent的存活状态内容
return agentAliveStatusMap;
}
@Override
public Map<String, AgentStatus> SyncCollectAgentMetricStatus(List<String> agentTopicNameList, int collectMetricWaitMaxTime) {
// 状态的结果Map
HashMap<String, AgentStatus> metricMap = new HashMap<>();
// 当前的时间
LocalDateTime currentTime = TimeUtils.currentFormatTime();
// 构造所有的Metric的OM并且下发
buildAndSendAgentStatusOctopusMessage(
agentTopicNameList,
METRIC_STATUS_MESSAGE_TYPE,
currentTime
);
// 同步等待结果, 并且解析结果
OctopusMessageSyncReplayContend metricSyncReplayContend = OctopusMessageSyncReplayContend.build(
agentTopicNameList.size(),
CurrentAppOctopusMessageType,
currentTime
);
asyncWaitOctopusMessageResultService.waitFor(metricSyncReplayContend);
// 解析结果
CountDownLatch countDownLatch = metricSyncReplayContend.getCountDownLatch();
// 等待状态返回的结果
boolean agentAliveStatusCollectResult = false;
try {
agentAliveStatusCollectResult = countDownLatch.await(
collectMetricWaitMaxTime,
TimeUnit.SECONDS
);
} catch (InterruptedException e) {
log.error("[Agent Metric] - 收集Agent的运行状态失败!");
throw new RuntimeException(e);
} finally {
if (!agentAliveStatusCollectResult) {
log.debug("Agent存活状态检查没有检查到全部的Agent");
}
// 移除等待队列
asyncWaitOctopusMessageResultService.stopWaiting(metricSyncReplayContend);
// 处理结果
metricSyncReplayContend
.getReplayOMList()
.stream()
.forEach(
statusOMessage -> {
if (statusOMessage.getResult() != null) {
// 解析Result对象为 AgentStatus
try {
AgentStatus agentStatus = OctopusObjectMapper.readValue(
(String) statusOMessage.getResult(),
AgentStatus.class
);
// 保存结果
metricMap.put(
statusOMessage.getUuid(),
agentStatus
);
} catch (JsonProcessingException e) {
log.error("[Agent Metric] - 解析AgentStatus失败!");
throw new RuntimeException(e);
}
}
}
);
}
return metricMap;
}
@Override
public ArrayList<AgentStatus> QueryMetricStatus(MetricQueryEntity metricQueryEntity) {
String queryRedisKey = metricQueryEntity.getAgentTopicName() + "-Metric";
if (!redisTemplate.hasKey(queryRedisKey)) {
log.error(
"[Metric] - 查询到没有此Agent {} 的Metric信息直接返回",
metricQueryEntity.getAgentTopicName()
);
}
double start = metricQueryEntity.getMetricStartTimeStamp();
double end = metricQueryEntity.getMetricEndTimeStamp();
ArrayList<AgentStatus> statusArrayList = new ArrayList<>();
redisTemplate
.opsForZSet()
.rangeByScore(
queryRedisKey,
start,
end
)
.stream()
.forEachOrdered(
metricString -> {
try {
AgentStatus agentStatus = OctopusObjectMapper.readValue(
(String) metricString,
AgentStatus.class
);
statusArrayList.add(agentStatus);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
);
return statusArrayList;
}
/**
* 2023年7月10日 通用的底层构造方法 Status类型的Octopus Message
*
* @param agentTopicNameList
* @param statusType
* @param currentTime
*/
private void buildAndSendAgentStatusOctopusMessage(List<String> agentTopicNameList, String statusType, LocalDateTime currentTime) {
List<OctopusMessage> octopusStatusMessageList = agentTopicNameList
.stream()
.map(
agentTopicName -> ConstructAgentStatusMessage(
statusType,
agentTopicName,
currentTime
)
)
.collect(Collectors.toList());
// 发送信息
oMessageToAgentSender.send(octopusStatusMessageList);
}
}
//package io.wdd.rpc.status.service;
//
//import com.fasterxml.jackson.core.JsonProcessingException;
//import io.wdd.common.utils.TimeUtils;
//import io.wdd.rpc.beans.request.MetricQueryEntity;
//import io.wdd.rpc.message.OctopusMessage;
//import io.wdd.rpc.message.OctopusMessageType;
//import io.wdd.rpc.message.handler.OMessageReplayContent;
//import io.wdd.rpc.message.sender.OMessageToAgentSender;
//import io.wdd.rpc.status.beans.AgentStatus;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.core.RedisTemplate;
//
//import javax.annotation.Resource;
//import java.time.LocalDateTime;
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//import java.util.concurrent.CountDownLatch;
//import java.util.concurrent.TimeUnit;
//import java.util.stream.Collectors;
//
//import static io.wdd.common.config.OctopusObjectMapperConfig.OctopusObjectMapper;
//import static io.wdd.rpc.message.handler.OMessageHandler.WaitFromAgent;
//import static io.wdd.rpc.status.OctopusStatusMessage.*;
//
//@Slf4j
////@Service
//public class SyncStatusServiceImpl implements SyncStatusService {
//
// private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.STATUS;
//
// @Resource
// OMessageToAgentSender oMessageToAgentSender;
//
// @Resource
// RedisTemplate redisTemplate;
//
// @Override
// public Map<String, Boolean> SyncCollectAgentAliveStatus(List<String> agentTopicNameList, int aliveStatusWaitMaxTime) {
//
// // 构造最后的结果Map
// Map<String, Boolean> agentAliveStatusMap = agentTopicNameList
// .stream()
// .collect(
// Collectors.toMap(
// agentTopicName -> agentTopicName,
// agentTopicName -> Boolean.FALSE
// ));
//
// // 当前的时间
// LocalDateTime currentTime = TimeUtils.currentFormatTime();
//
// // 构造OctopusMessage - StatusMessage结构体, 下发所有的消息
// buildAndSendAgentStatusOctopusMessage(
// agentTopicNameList,
// HEALTHY_STATUS_MESSAGE_TYPE,
// currentTime
// );
//
// // 同步收集消息
// OMessageReplayContent statusSyncReplayContend = OMessageReplayContent.build(
// agentTopicNameList.size(),
// CurrentAppOctopusMessageType,
// currentTime
// );
// WaitFromAgent();
//
// // 解析结果
// CountDownLatch countDownLatch = statusSyncReplayContend.getCountDownLatch();
//
// // 等待状态返回的结果
// boolean agentAliveStatusCollectResult = false;
// try {
// agentAliveStatusCollectResult = countDownLatch.await(
// aliveStatusWaitMaxTime,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } finally {
// if (!agentAliveStatusCollectResult) {
// log.debug("Agent存活状态检查没有检查到全部的Agent");
// }
//
// // 移除等待队列
// asyncWaitOctopusMessageResultService.stopWaiting(statusSyncReplayContend);
//
// // 处理结果
// statusSyncReplayContend
// .getReplayOMList()
// .stream()
// .forEach(
// statusOMessage -> {
// if (statusOMessage.getResult() != null) {
// agentAliveStatusMap.put(
// statusOMessage.getUuid(),
// Boolean.TRUE
// );
// }
// }
// );
// }
//
// // 返回Agent的存活状态内容
// return agentAliveStatusMap;
// }
//
// @Override
// public Map<String, AgentStatus> SyncCollectAgentMetricStatus(List<String> agentTopicNameList, int collectMetricWaitMaxTime) {
//
// // 状态的结果Map
// HashMap<String, AgentStatus> metricMap = new HashMap<>();
//
// // 当前的时间
// LocalDateTime currentTime = TimeUtils.currentFormatTime();
//
// // 构造所有的Metric的OM并且下发
// buildAndSendAgentStatusOctopusMessage(
// agentTopicNameList,
// METRIC_STATUS_MESSAGE_TYPE,
// currentTime
// );
//
// // 同步等待结果, 并且解析结果
// OMessageReplayContent metricSyncReplayContend = OMessageReplayContent.build(
// agentTopicNameList.size(),
// CurrentAppOctopusMessageType,
// currentTime
// );
// asyncWaitOctopusMessageResultService.waitFor(metricSyncReplayContend);
//
// // 解析结果
// CountDownLatch countDownLatch = metricSyncReplayContend.getCountDownLatch();
//
// // 等待状态返回的结果
// boolean agentAliveStatusCollectResult = false;
// try {
// agentAliveStatusCollectResult = countDownLatch.await(
// collectMetricWaitMaxTime,
// TimeUnit.SECONDS
// );
// } catch (InterruptedException e) {
// log.error("[Agent Metric] - 收集Agent的运行状态失败!");
// throw new RuntimeException(e);
// } finally {
// if (!agentAliveStatusCollectResult) {
// log.debug("Agent存活状态检查没有检查到全部的Agent");
// }
//
// // 移除等待队列
// asyncWaitOctopusMessageResultService.stopWaiting(metricSyncReplayContend);
//
// // 处理结果
// metricSyncReplayContend
// .getReplayOMList()
// .stream()
// .forEach(
// statusOMessage -> {
// if (statusOMessage.getResult() != null) {
//
// // 解析Result对象为 AgentStatus
// try {
//
// AgentStatus agentStatus = OctopusObjectMapper.readValue(
// (String) statusOMessage.getResult(),
// AgentStatus.class
// );
//
// // 保存结果
// metricMap.put(
// statusOMessage.getUuid(),
// agentStatus
// );
//
// } catch (JsonProcessingException e) {
// log.error("[Agent Metric] - 解析AgentStatus失败!");
// throw new RuntimeException(e);
// }
//
// }
// }
// );
// }
//
// return metricMap;
// }
//
// @Override
// public ArrayList<AgentStatus> QueryMetricStatus(MetricQueryEntity metricQueryEntity) {
//
// String queryRedisKey = metricQueryEntity.getAgentTopicName() + "-Metric";
//
// if (!redisTemplate.hasKey(queryRedisKey)) {
// log.error(
// "[Metric] - 查询到没有此Agent {} 的Metric信息直接返回",
// metricQueryEntity.getAgentTopicName()
// );
// }
//
// double start = metricQueryEntity.getMetricStartTimeStamp();
// double end = metricQueryEntity.getMetricEndTimeStamp();
//
// ArrayList<AgentStatus> statusArrayList = new ArrayList<>();
//
// redisTemplate
// .opsForZSet()
// .rangeByScore(
// queryRedisKey,
// start,
// end
// )
// .stream()
// .forEachOrdered(
// metricString -> {
// try {
//
// AgentStatus agentStatus = OctopusObjectMapper.readValue(
// (String) metricString,
// AgentStatus.class
// );
//
// statusArrayList.add(agentStatus);
//
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
// }
// );
//
//
// return statusArrayList;
// }
//
//
// /**
// * 2023年7月10日 通用的底层构造方法 Status类型的Octopus Message
// *
// * @param agentTopicNameList
// * @param statusType
// * @param currentTime
// * @return
// */
// private List<OctopusMessage> buildAndSendAgentStatusOctopusMessage(List<String> agentTopicNameList, String statusType, LocalDateTime currentTime) {
//
// List<OctopusMessage> octopusStatusMessageList = agentTopicNameList
// .stream()
// .map(
// agentTopicName -> ConstructAgentStatusMessage(
// statusType,
// agentTopicName,
// currentTime
// )
// )
// .collect(Collectors.toList());
//
// // 发送信息
// oMessageToAgentSender.send(octopusStatusMessageList);
//
// return octopusStatusMessageList;
// }
//
//
//}

View File

@@ -164,6 +164,5 @@ oss:
# 开启debug模式
logging:
level:
io.wdd.rpc.execute: debug
debug: true
io.wdd.rpc: debug