[server][ xray]- 代理节点的数据库内容 - 1
This commit is contained in:
@@ -13,6 +13,7 @@ import io.wdd.server.utils.DaemonDatabaseOperator;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
@@ -22,10 +23,7 @@ import org.springframework.stereotype.Service;
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@@ -35,14 +33,31 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j(topic = "octopus agent init ")
|
||||
public class AcceptAgentInitInfo {
|
||||
|
||||
public static Set<String> ALL_SERVER_CITY_INFO = new HashSet<>(
|
||||
Arrays.asList(
|
||||
"HongKong", "Tokyo", "Seoul", "Phoenix", "London", "Shanghai", "Chengdu"
|
||||
public static final HashMap<String, Integer> ALL_SERVER_CITY_INDEX = new HashMap<>(
|
||||
Map.of(
|
||||
"Chengdu",
|
||||
1,
|
||||
"Shanghai",
|
||||
2,
|
||||
"HongKong",
|
||||
3,
|
||||
"Seoul",
|
||||
4,
|
||||
"Tokyo",
|
||||
5,
|
||||
"Phoenix",
|
||||
6,
|
||||
"London",
|
||||
7
|
||||
)
|
||||
);
|
||||
public static Set<String> ALL_SERVER_ARCH_INFO = new HashSet<>(
|
||||
Arrays.asList(
|
||||
"amd64", "arm64", "arm32", "xia32", "miples"
|
||||
"amd64",
|
||||
"arm64",
|
||||
"arm32",
|
||||
"xia32",
|
||||
"miples"
|
||||
)
|
||||
);
|
||||
@Resource
|
||||
@@ -91,7 +106,10 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
try {
|
||||
|
||||
serverInfoVO = objectMapper.readValue(message.getBody(), ServerInfoVO.class);
|
||||
serverInfoVO = objectMapper.readValue(
|
||||
message.getBody(),
|
||||
ServerInfoVO.class
|
||||
);
|
||||
|
||||
// 1. check if information is correct
|
||||
if (!validateServerInfo(serverInfoVO)) {
|
||||
@@ -132,7 +150,11 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
// long deliveryTag, boolean multiple, boolean requeue
|
||||
|
||||
channel.basicNack(deliveryTag, false, true);
|
||||
channel.basicNack(
|
||||
deliveryTag,
|
||||
false,
|
||||
true
|
||||
);
|
||||
// long deliveryTag, boolean requeue
|
||||
// channel.basicReject(deliveryTag,true);
|
||||
|
||||
@@ -158,8 +180,14 @@ public class AcceptAgentInitInfo {
|
||||
*/
|
||||
// ack the rabbitmq info
|
||||
// If all logic is successful
|
||||
log.info("Agent [ {} ] has init successfully !", serverInfoVO.getTopicName());
|
||||
channel.basicAck(deliveryTag, false);
|
||||
log.info(
|
||||
"Agent [ {} ] has init successfully !",
|
||||
serverInfoVO.getTopicName()
|
||||
);
|
||||
channel.basicAck(
|
||||
deliveryTag,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
private void generateAgentStatusRedisStreamConsumerGroup(String agentTopicName) {
|
||||
@@ -170,7 +198,12 @@ public class AcceptAgentInitInfo {
|
||||
log.debug(" not find the group, recreate");
|
||||
|
||||
// not find the group, recreate
|
||||
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
|
||||
redisTemplate
|
||||
.opsForStream()
|
||||
.createGroup(
|
||||
statusStreamKey,
|
||||
"OctopusServer"
|
||||
);
|
||||
}
|
||||
|
||||
// check for octopus-server consumer group
|
||||
@@ -185,12 +218,18 @@ public class AcceptAgentInitInfo {
|
||||
redisTemplate.opsForStream().createGroup(statusStreamKey, "OctopusServer");
|
||||
}*/
|
||||
|
||||
log.debug("octopus agent [ {} ] status report stream key [ {} ] has been created !", agentTopicName, statusStreamKey);
|
||||
log.debug(
|
||||
"octopus agent [ {} ] status report stream key [ {} ] has been created !",
|
||||
agentTopicName,
|
||||
statusStreamKey
|
||||
);
|
||||
}
|
||||
|
||||
private boolean checkAgentAlreadyRegister(String agentQueueTopic) {
|
||||
|
||||
Optional<String> first = databaseOperator.getAllServerName().stream().
|
||||
Optional<String> first = databaseOperator
|
||||
.getAllServerName()
|
||||
.stream().
|
||||
filter(serverName -> agentQueueTopic.startsWith(serverName))
|
||||
.findFirst();
|
||||
|
||||
@@ -199,7 +238,8 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
private boolean sendInitMessageToAgent(ServerInfoVO serverInfoVO) {
|
||||
|
||||
OctopusMessage octopusMessage = OctopusMessage.builder()
|
||||
OctopusMessage octopusMessage = OctopusMessage
|
||||
.builder()
|
||||
.type(OctopusMessageType.INIT)
|
||||
// should be the OctopusExchange Name
|
||||
.content(String.valueOf(initRabbitMQConfig.OCTOPUS_EXCHANGE))
|
||||
@@ -230,17 +270,29 @@ public class AcceptAgentInitInfo {
|
||||
|
||||
// topic generate strategy
|
||||
String serverName = serverInfoVO.getServerName();
|
||||
serverName.replace(" ", "");
|
||||
serverName.replace(
|
||||
" ",
|
||||
""
|
||||
);
|
||||
serverInfoVO.setServerName(serverName);
|
||||
|
||||
// validate serverName
|
||||
String[] split = serverName.split("-");
|
||||
if (split.length <= 2 || !ALL_SERVER_CITY_INFO.contains(split[0]) || !ALL_SERVER_ARCH_INFO.contains(split[1])) {
|
||||
log.error("server info from agent are {}", serverInfoVO);
|
||||
if (split.length <= 2 || !ALL_SERVER_CITY_INDEX.containsKey(split[0]) || !ALL_SERVER_ARCH_INFO.contains(split[1])) {
|
||||
log.error(
|
||||
"server info from agent are {}",
|
||||
serverInfoVO
|
||||
);
|
||||
throw new MyRuntimeException("server name not validated !");
|
||||
}
|
||||
|
||||
String machineIdPrefixSixBytes = String.valueOf(serverInfoVO.getMachineId().toCharArray(), 0, 6);
|
||||
String machineIdPrefixSixBytes = String.valueOf(
|
||||
serverInfoVO
|
||||
.getMachineId()
|
||||
.toCharArray(),
|
||||
0,
|
||||
6
|
||||
);
|
||||
|
||||
return serverName + "-" + machineIdPrefixSixBytes;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user