[ server] [ agent ] - 版本信息控制

This commit is contained in:
zeaslity
2023-02-08 16:46:44 +08:00
parent 4382bcdc78
commit 1ad81e41c9
5 changed files with 72 additions and 13 deletions

View File

@@ -173,7 +173,7 @@ jobs:
# Name for the queue # Name for the queue
RABBIT_QUEUE_NAME: "OctopusToServer" RABBIT_QUEUE_NAME: "OctopusToServer"
# Message to be sent # Message to be sent
MESSAGE: "{\"uuid\":\"Octopus-Server\",\"init_time\": ${{ env.AGENT_VERSION }},\"type\":\"AGENT\",\"content\":${{ env.AGENT_VERSION }},\"result\": ${{ env.AGENT_VERSION }},\"ac_time\": ${{ env.AGENT_VERSION }}}" MESSAGE: "{\"uuid\":\"Octopus-Server\",\"init_time\": null,\"type\":\"AGENT\",\"content\": \"${{ env.AGENT_VERSION }}\",\"result\": \"${{ env.AGENT_VERSION }}\",\"ac_time\": null}"
# Durability for the queue # Durability for the queue
DURABLE: true DURABLE: true

View File

@@ -19,7 +19,7 @@ public class AgentOperationInfoService {
@Resource @Resource
OMessageToServerSender oMessageToServerSender; OMessageToServerSender oMessageToServerSender;
public void AgentOpInfo(OctopusMessage order){ public void AgentOpInfo(OctopusMessage octopusMessage){
} }

View File

@@ -57,11 +57,15 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler {
// update // update
agentRebootUpdateService.exAgentUpdate(operationMessage); agentRebootUpdateService.exAgentUpdate(operationMessage);
} else if (opType.equals(AgentOperationType.VERSION)) { } else if (opType.equals(AgentOperationType.VERSION)) {
// 收集Agent的版本信息 // 收集Agent的版本信息
agentOperationInfoService.AgentOpVersion(octopusMessage); agentOperationInfoService.AgentOpVersion(octopusMessage);
} else if (opType.equals(AgentOperationType.INFO)) { } else if (opType.equals(AgentOperationType.INFO)) {
// 收集Agent的核心Info信息 // 收集Agent的核心Info信息
agentOperationInfoService.AgentOpInfo(octopusMessage); agentOperationInfoService.AgentOpInfo(octopusMessage);
} else { } else {
// operation unknown // operation unknown
log.error("Command Agent Operation Unknown! "); log.error("Command Agent Operation Unknown! ");

View File

@@ -11,6 +11,7 @@ import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.config.ServerCommonPool; import io.wdd.server.config.ServerCommonPool;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
@@ -25,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST; import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION;
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT; import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
@Service @Service
@@ -37,6 +40,9 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
@Resource @Resource
ObjectMapper objectMapper; ObjectMapper objectMapper;
@Resource
RedisTemplate redisTemplate;
@Override @Override
public Map<String, String> getAllAgentVersion() { public Map<String, String> getAllAgentVersion() {
HashMap<String, String> result = new HashMap<>(); HashMap<String, String> result = new HashMap<>();
@@ -44,7 +50,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
// 查询获取到最新的Agent版本信息 // 查询获取到最新的Agent版本信息
result.put( result.put(
"LATEST_VERSION", "LATEST_VERSION",
"2023-02-06-09-23-00" getRealAgentLatestVersion()
); );
// 获取Agent的版本信息 -- 超时时间 // 获取Agent的版本信息 -- 超时时间
@@ -63,14 +69,27 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
); );
try { try {
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
System.out.println("result = " + result);
return result; return result;
} }
private String getRealAgentLatestVersion() {
String latestVersion = (String) redisTemplate
.opsForValue()
.get(
AGENT_LATEST_VERSION
);
return latestVersion;
}
@Override @Override
public Map<String, ServerInfoVO> getAllAgentCoreInfo() { public Map<String, ServerInfoVO> getAllAgentCoreInfo() {
return null; return null;
@@ -87,7 +106,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
CompletableFuture<Void> getAllAgentVersionInfo = new CompletableFuture<>(); CompletableFuture<Void> getAllAgentVersionInfo = new CompletableFuture<>();
try { try {
ExecutorService pool = ServerCommonPool.pool; ExecutorService pool = ServerCommonPool.pool;
CountDownLatch countDownLatch = new CountDownLatch(ALL_AGENT_TOPIC_NAME_LIST.size()); CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息 // 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
getAllAgentVersionInfo = CompletableFuture.runAsync( getAllAgentVersionInfo = CompletableFuture.runAsync(
@@ -163,7 +182,7 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) { private void buildAndSendToAllAgent(AgentOperationType operationType, LocalDateTime currentTime) {
List<OctopusMessage> octopusMessageList = ALL_AGENT_TOPIC_NAME_LIST List<OctopusMessage> octopusMessageList = ALL_HEALTHY_AGENT_TOPIC_NAME_LIST
.stream() .stream()
.map( .map(
agentTopicName -> agentTopicName ->

View File

@@ -9,6 +9,7 @@ import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
@@ -18,33 +19,68 @@ import java.util.concurrent.ArrayBlockingQueue;
@Slf4j(topic = "Octopus Message Handler") @Slf4j(topic = "Octopus Message Handler")
public class OctopusMessageHandler { public class OctopusMessageHandler {
/**
* Redis Key 用于保存Agent的最新版本
* 由 GitHubAction发送至 RabbitMQ中然后此处获取处理发送至Redis中
*/
public static final String AGENT_LATEST_VERSION = "AGENT_LATEST_VERSION";
/** /**
* 存储所有的从 Agent过来的 OctopusMessage * 存储所有的从 Agent过来的 OctopusMessage
* 各个业务模块需要自己手动去获取自己需要的内容 * 各个业务模块需要自己手动去获取自己需要的内容
* TODO 数据一致性问题当AgentShutDown可能有一些信息会消失
*/ */
public static ArrayBlockingQueue<OctopusMessage> OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>(128, true); public static ArrayBlockingQueue<OctopusMessage> OCTOPUS_MESSAGE_FROM_AGENT = new ArrayBlockingQueue<>(
128,
true
);
@Resource
RedisTemplate redisTemplate;
@Resource @Resource
ObjectMapper objectMapper; ObjectMapper objectMapper;
@RabbitHandler @RabbitHandler
@RabbitListener(queues = "${octopus.message.octopus_to_server}" @RabbitListener(queues = "${octopus.message.octopus_to_server}"
) )
public void HandleOctopusMessageFromAgent(Message message){ public void HandleOctopusMessageFromAgent(Message message) {
OctopusMessage octopusMessage; OctopusMessage octopusMessage;
try { try {
octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); octopusMessage = objectMapper.readValue(
message.getBody(),
OctopusMessage.class
);
} catch (IOException e) { } catch (IOException e) {
throw new MyRuntimeException("Octopus Message Wrong !"); throw new MyRuntimeException("Octopus Message Wrong !");
} }
// Octopus Message Handler // Octopus Message Handler
log.info("received from agent : {} ", octopusMessage); log.info(
"received from agent : {} ",
octopusMessage
);
// 获取Agent的版本信息
if (octopusMessage
.getUuid()
.equals("Octopus-Server")) {
// 更新缓存Agent的最新版本信息
String latestVersion = (String) octopusMessage.getResult();
log.info(
"开始向Redis中缓存Agent的最新版本 => {}",
latestVersion
);
redisTemplate
.opsForValue()
.set(
AGENT_LATEST_VERSION,
latestVersion
);
}
// todo what to do after received the result // todo what to do after received the result
log.debug("cache the octopus message to inner cache list !"); log.debug("cache the octopus message to inner cache list !");
OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage); OCTOPUS_MESSAGE_FROM_AGENT.add(octopusMessage);