[ server] [ agent ] - 版本信息全流程-完成

This commit is contained in:
zeaslity
2023-02-08 17:32:02 +08:00
parent 1ad81e41c9
commit 8231dd21e4
4 changed files with 77 additions and 62 deletions

View File

@@ -70,6 +70,21 @@ public class TimeUtils {
return LocalDateTime.now(SYSTEM_TIME_ZONE_ID); return LocalDateTime.now(SYSTEM_TIME_ZONE_ID);
} }
/**
* @return 格式化 去掉时间中的毫秒数
*/
public static LocalDateTime currentFormatTime() {
DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String format = LocalDateTime
.now(SYSTEM_TIME_ZONE_ID)
.format(ofPattern);
return LocalDateTime.parse(format, ofPattern);
}
public static LocalDateTime cvFromDate(Date date) { public static LocalDateTime cvFromDate(Date date) {
// fix bug // fix bug

View File

@@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION; import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION;
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
@@ -57,25 +56,34 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 发送OctopusMessage-Agent // 发送OctopusMessage-Agent
// 从OctopusToServer中收集到所有Agent的版本信息 // 从OctopusToServer中收集到所有Agent的版本信息
// 组装信息至集合中 // 组装信息至集合中
LocalDateTime currentTime = TimeUtils.currentTime(); LocalDateTime currentTime = TimeUtils.currentFormatTime();
buildAndSendToAllAgent( buildAndSendToAllAgent(
AgentOperationType.VERSION, AgentOperationType.VERSION,
currentTime currentTime
); );
waitCollectAllAgentVersionInfo( CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
CompletableFuture<Void> getAllAgentVersionInfoFuture = waitCollectAllAgentVersionInfo(
result, result,
currentTime currentTime,
countDownLatch
); );
try { try {
TimeUnit.SECONDS.sleep(5); // 超时等待5秒钟, 或者所有的Agent均已经完成上报
countDownLatch.await(
5,
TimeUnit.SECONDS
);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); log.warn("存在部分Agent没有上报 版本信息!");
} finally {
// 必须关闭释放线程
getAllAgentVersionInfoFuture.cancel(true);
} }
System.out.println("result = " + result);
return result; return result;
} }
@@ -101,63 +109,56 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
return null; return null;
} }
private void waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime) { private CompletableFuture<Void> waitCollectAllAgentVersionInfo(HashMap<String, String> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
CompletableFuture<Void> getAllAgentVersionInfo = new CompletableFuture<>(); ExecutorService pool = ServerCommonPool.pool;
try {
ExecutorService pool = ServerCommonPool.pool;
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
getAllAgentVersionInfo = CompletableFuture.runAsync( return CompletableFuture.runAsync(
() -> { () -> {
while (true) { while (true) {
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) { if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
// 开始收集等待 200ms // 开始收集等待 200ms
try { try {
TimeUnit.MILLISECONDS.sleep(50); TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
}
} }
// 返回,继续死循环
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll(); continue;
// 获取到OM
// 判断信息是否是需要的类型
// 根据 init_time + OctopusMessageType + AgentOperationMessage
if (!judgyIsCurrentServerReplyMessage(
message,
startTime
)) {
// 不是当前应用需要的的OM将信息放置与Cache队列的末尾
try {
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return;
}
// 是当前需要的消息信息
result.put(
message.getUuid(),
(String) message.getResult()
);
} }
},
pool
);
// 超时等待5秒钟 OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
countDownLatch.await(
5, // 获取到OM
TimeUnit.SECONDS // 判断信息是否是需要的类型
); // 根据 init_time + OctopusMessageType + AgentOperationMessage
} catch (InterruptedException e) { if (!judgyIsCurrentServerReplyMessage(
log.warn("存在部分Agent没有上报 版本信息!"); message,
getAllAgentVersionInfo.cancel(true); startTime
} )) {
// 不是当前应用需要的的OM将信息放置与Cache队列的末尾
try {
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回,继续死循环
continue;
}
// 是当前需要的消息信息
result.put(
message.getUuid(),
(String) message.getResult()
);
countDownLatch.countDown();
}
},
pool
);
} }

View File

@@ -83,7 +83,7 @@ public class OctopusMessageHandler {
// todo what to do after received the result // todo what to do after received the result
log.debug("cache the octopus message to inner cache list !"); log.debug("cache the octopus message to inner cache list !");
OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage); OCTOPUS_MESSAGE_FROM_AGENT.offer(octopusMessage);
// collect all message from agent and log to somewhere // collect all message from agent and log to somewhere

View File

@@ -8,7 +8,6 @@ public class ServerCommonPool {
public static ExecutorService pool; public static ExecutorService pool;
static { static {
ThreadFactory threadFactory = new ThreadFactoryBuilder() ThreadFactory threadFactory = new ThreadFactoryBuilder()