[ server ] [ agent ]- 收集Agent的版本信息 初步完成
This commit is contained in:
@@ -8,24 +8,31 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import io.wdd.common.utils.TimeUtils;
|
||||
import io.wdd.rpc.message.sender.OMessageToAgentSender;
|
||||
import io.wdd.server.config.ServerCommonPool;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST;
|
||||
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class OctopusAgentServiceImpl implements OctopusAgentService{
|
||||
public class OctopusAgentServiceImpl implements OctopusAgentService {
|
||||
|
||||
private static final OctopusMessageType CurrentAppOctopusMessageType = OctopusMessageType.AGENT;
|
||||
@Resource
|
||||
OMessageToAgentSender oMessageToAgentSender;
|
||||
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
@@ -34,24 +41,115 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{
|
||||
HashMap<String, String> result = new HashMap<>();
|
||||
|
||||
// 查询获取到最新的Agent版本信息
|
||||
result.put("LATEST_VERSION","2023-02-06-09-23-00");
|
||||
result.put(
|
||||
"LATEST_VERSION",
|
||||
"2023-02-06-09-23-00"
|
||||
);
|
||||
|
||||
// 获取Agent的版本信息 -- 超时时间
|
||||
// 发送OctopusMessage-Agent
|
||||
// 从OctopusToServer中收集到所有Agent的版本信息
|
||||
// 组装信息至集合中
|
||||
buildAndSendToAllAgent(AgentOperationType.VERSION);
|
||||
LocalDateTime currentTime = TimeUtils.currentTime();
|
||||
buildAndSendToAllAgent(
|
||||
AgentOperationType.VERSION,
|
||||
currentTime
|
||||
);
|
||||
|
||||
waitCollectAllAgentVersionInfo(result);
|
||||
waitCollectAllAgentVersionInfo(
|
||||
result,
|
||||
currentTime
|
||||
);
|
||||
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(3);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private void waitCollectAllAgentVersionInfo(HashMap<String, String> result) {
|
||||
private void waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime) {
|
||||
|
||||
CompletableFuture<Void> getAllAgentVersionInfo = new CompletableFuture<>();
|
||||
try {
|
||||
ExecutorService pool = ServerCommonPool.pool;
|
||||
CountDownLatch countDownLatch = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size());
|
||||
|
||||
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
|
||||
getAllAgentVersionInfo = CompletableFuture.runAsync(
|
||||
() -> {
|
||||
while (true) {
|
||||
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
|
||||
// 开始收集等待 200ms
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
|
||||
|
||||
// 获取到OM
|
||||
// 判断信息是否是需要的类型
|
||||
// 根据 init_time + OctopusMessageType + AgentOperationMessage
|
||||
if (!judgyIsCurrentServerReplyMessage(
|
||||
message,
|
||||
startTime
|
||||
)) {
|
||||
// 不是当前应用需要的的OM,将信息放置与Cache队列的末尾
|
||||
try {
|
||||
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 是当前需要的消息信息
|
||||
result.put(
|
||||
message.getUuid(),
|
||||
(String) message.getResult()
|
||||
);
|
||||
}
|
||||
},
|
||||
pool
|
||||
);
|
||||
|
||||
// 超时等待5秒钟
|
||||
countDownLatch.await(
|
||||
5,
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("存在部分Agent没有上报 版本信息!");
|
||||
getAllAgentVersionInfo.cancel(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void buildAndSendToAllAgent(AgentOperationType operationType) {
|
||||
/**
|
||||
* 判断信息是否是需要的类型
|
||||
* 根据 init_time + OctopusMessageType + AgentOperationMessage
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private boolean judgyIsCurrentServerReplyMessage(OctopusMessage message, LocalDateTime startTime) {
|
||||
|
||||
// init_time 时间判断
|
||||
boolean startTimeEqual = startTime.isEqual(message.getInit_time());
|
||||
|
||||
// OctopusMessageType判断
|
||||
boolean OMTypeEqual = message
|
||||
.getType()
|
||||
.equals(CurrentAppOctopusMessageType);
|
||||
|
||||
return startTimeEqual && OMTypeEqual;
|
||||
}
|
||||
|
||||
private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) {
|
||||
|
||||
List<OctopusMessage> octopusMessageList = ALL_AGENT_TOPIC_NAME_LIST
|
||||
.stream()
|
||||
@@ -59,7 +157,8 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{
|
||||
agentTopicName ->
|
||||
ConstructAgentOperationMessage(
|
||||
agentTopicName,
|
||||
operationType
|
||||
operationType,
|
||||
currentTime
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
@@ -74,9 +173,10 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{
|
||||
*
|
||||
* @param agentTopicName
|
||||
* @param operationType
|
||||
* @param currentTime
|
||||
* @return
|
||||
*/
|
||||
private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType) {
|
||||
private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType, LocalDateTime currentTime) {
|
||||
|
||||
AgentOperationMessage operationMessage = AgentOperationMessage
|
||||
.builder()
|
||||
@@ -92,9 +192,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService{
|
||||
|
||||
return OctopusMessage
|
||||
.builder()
|
||||
.type(OctopusMessageType.AGENT)
|
||||
.type(CurrentAppOctopusMessageType)
|
||||
.uuid(agentTopicName)
|
||||
.init_time(TimeUtils.currentTime())
|
||||
.init_time(currentTime)
|
||||
.content(ops)
|
||||
.build();
|
||||
|
||||
|
||||
@@ -12,12 +12,19 @@ import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
|
||||
@Configuration
|
||||
@Slf4j(topic = "Octopus Message Handler")
|
||||
public class OctopusMessageHandlerServer {
|
||||
public class OctopusMessageHandler {
|
||||
|
||||
|
||||
/**
|
||||
* 存储所有的从 Agent过来的 OctopusMessage
|
||||
* 各个业务模块需要自己手动去获取自己需要的内容
|
||||
*/
|
||||
public static ArrayBlockingQueue<OctopusMessage> OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>(128, true);
|
||||
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
@@ -39,6 +46,8 @@ public class OctopusMessageHandlerServer {
|
||||
|
||||
|
||||
// todo what to do after received the result
|
||||
log.debug("cache the octopus message to inner cache list !");
|
||||
OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage);
|
||||
|
||||
// collect all message from agent and log to somewhere
|
||||
|
||||
Reference in New Issue
Block a user