[server-agent] accomplish the register procedure
This commit is contained in:
@@ -1,22 +0,0 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
|
||||
|
||||
|
||||
public class OMHandlerInit extends AbstractOctopusMessageHandler {
|
||||
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
if (!octopusMessage.getType().equals(OctopusMessageType.INIT)) {
|
||||
this.getNextHandler().handle(octopusMessage);
|
||||
}
|
||||
|
||||
// handle the init message
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package io.wdd.agent.config.rabbitmq.config;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class OctopusRabbitMQAdminConfig {
|
||||
|
||||
@Autowired
|
||||
ConnectionFactory connectionFactory;
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(){
|
||||
|
||||
return new RabbitAdmin(connectionFactory);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
|
||||
/**
|
||||
@@ -10,10 +11,11 @@ import org.springframework.context.annotation.Configuration;
|
||||
@Configuration
|
||||
public abstract class AbstractOctopusMessageHandler {
|
||||
|
||||
private AbstractOctopusMessageHandler next;
|
||||
protected AbstractOctopusMessageHandler next;
|
||||
|
||||
public void addHandler(AbstractOctopusMessageHandler handler) {
|
||||
this.next = handler;
|
||||
|
||||
}
|
||||
|
||||
public AbstractOctopusMessageHandler getNextHandler() {
|
||||
@@ -1,14 +1,17 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class OMHandlerAgent extends AbstractOctopusMessageHandler {
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
if (!octopusMessage.getType().equals(OctopusMessageType.AGENT)) {
|
||||
this.getNextHandler().handle(octopusMessage);
|
||||
return next.handle(octopusMessage);
|
||||
}
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -1,15 +1,17 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
|
||||
if (!octopusMessage.getType().equals(OctopusMessageType.EXECUTOR)) {
|
||||
this.getNextHandler().handle(octopusMessage);
|
||||
return next.handle(octopusMessage);
|
||||
}
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.agent.initialization.beans.AgentServerInfo;
|
||||
import io.wdd.agent.initialization.rabbitmq.GenerateOctopusConnection;
|
||||
import io.wdd.agent.message.ToServerMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
|
||||
/**
|
||||
* handle the agent PassThroughTopicName info
|
||||
* 1. generator the unique topic queue for agent itself
|
||||
* 2. send PassThroughTopicName successful info to the server
|
||||
*/
|
||||
@Lazy
|
||||
@Component
|
||||
@Slf4j
|
||||
public class OMHandlerInit extends AbstractOctopusMessageHandler {
|
||||
|
||||
@Resource
|
||||
GenerateOctopusConnection generateOctopusConnection;
|
||||
|
||||
@Resource
|
||||
ToServerMessage toServerMessage;
|
||||
|
||||
@Resource
|
||||
AgentServerInfo agentServerInfo;
|
||||
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
if (!octopusMessage.getType().equals(OctopusMessageType.INIT)) {
|
||||
return next.handle(octopusMessage);
|
||||
}
|
||||
|
||||
// handle the PassThroughTopicName message
|
||||
// 1. generator the unique topic queue for agent itself
|
||||
// 1.1 initial the specific topic queue listener
|
||||
generateOctopusConnection.ManualGenerate(octopusMessage);
|
||||
|
||||
|
||||
// 2. send PassThroughTopicName successful info to the server
|
||||
String success = String.format("Octopus Agent [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo, octopusMessage);
|
||||
|
||||
octopusMessage.setResult(success);
|
||||
log.info(success);
|
||||
|
||||
toServerMessage.send(octopusMessage);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -1,16 +1,18 @@
|
||||
package io.wdd.agent.config.rabbitmq;
|
||||
package io.wdd.agent.config.rabbitmq.handler;
|
||||
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
|
||||
if (!octopusMessage.getType().equals(OctopusMessageType.STATUS)) {
|
||||
this.getNextHandler().handle(octopusMessage);
|
||||
return next.handle(octopusMessage);
|
||||
}
|
||||
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -100,7 +100,7 @@ public class CollectSystemInfo implements ApplicationContextAware {
|
||||
|
||||
// start to send message to Octopus Server
|
||||
octopusAgentInitService.SendInfoToServer(agentServerInfo);
|
||||
log.info("init server info has been send to octopus server !");
|
||||
log.info("PassThroughTopicName server info has been send to octopus server !");
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -3,18 +3,14 @@ package io.wdd.agent.initialization.bootup;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import io.wdd.agent.initialization.beans.AgentServerInfo;
|
||||
import io.wdd.agent.initialization.rabbitmq.InitialRabbitMqConnector;
|
||||
import io.wdd.agent.message.ToServerMessage;
|
||||
import io.wdd.agent.message.handler.OctopusMessageHandler;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessagePostProcessor;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@@ -23,44 +19,36 @@ import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@Lazy
|
||||
@Slf4j
|
||||
public class OctopusAgentInitService {
|
||||
|
||||
|
||||
@Resource
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Resource
|
||||
InitialRabbitMqConnector initialRabbitMqConnector;
|
||||
|
||||
@Autowired
|
||||
ObjectMapper objectMapper;
|
||||
ToServerMessage toServerMessage;
|
||||
|
||||
@Autowired
|
||||
OctopusMessageHandler octopusMessageHandler;
|
||||
|
||||
@Resource
|
||||
AgentServerInfo agentServerInfo;
|
||||
|
||||
@Value("${octopus.message.init_ttl}")
|
||||
String defaultInitRegisterTimeOut;
|
||||
|
||||
@SneakyThrows
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
public void SendInfoToServer(AgentServerInfo agentServerInfo) {
|
||||
|
||||
// set init agent register ttl
|
||||
InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut);
|
||||
|
||||
log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo);
|
||||
|
||||
// send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server
|
||||
rabbitTemplate.convertAndSend(initialRabbitMqConnector.INIT_EXCHANGE, initialRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsString(agentServerInfo), initMessagePostProcessor);
|
||||
toServerMessage.sendInitInfo(agentServerInfo, defaultInitRegisterTimeOut);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* listen to the init queue from octopus server
|
||||
* listen to the PassThroughTopicName queue from octopus server
|
||||
*
|
||||
* @param message 该方法不需要手动调用,Spring会自动运行这个监听方法
|
||||
* <p>
|
||||
@@ -82,11 +70,17 @@ public class OctopusAgentInitService {
|
||||
)
|
||||
public void ReceiveInitInfoFromServer(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
|
||||
|
||||
|
||||
try {
|
||||
|
||||
OctopusMessage octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class);
|
||||
|
||||
// consider the multi-agents register situation
|
||||
// judge the machineID begin
|
||||
String[] split = octopusMessage.getUuid().split("-");
|
||||
if (!agentServerInfo.getMachineId().startsWith(split[split.length - 1])) {
|
||||
throw new MyRuntimeException("INIT Message not for this agent !");
|
||||
}
|
||||
|
||||
// response chain to handle all kind of type of octopus message
|
||||
if (!octopusMessageHandler.handle(octopusMessage)) {
|
||||
throw new MyRuntimeException(" Handle Octopus Message Error !");
|
||||
@@ -99,8 +93,8 @@ public class OctopusAgentInitService {
|
||||
// long deliveryTag, boolean requeue
|
||||
// channel.basicReject(deliveryTag,true);
|
||||
|
||||
Thread.sleep(1000); // 这里只是便于出现死循环时查看
|
||||
|
||||
// 这里只是便于出现死循环时查看
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
|
||||
throw new MyRuntimeException("Octopus Agent Initialization Error, please check !");
|
||||
}
|
||||
@@ -109,25 +103,5 @@ public class OctopusAgentInitService {
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
|
||||
private class InitMessagePostProcessor implements MessagePostProcessor {
|
||||
|
||||
private final String initMessageTTL;
|
||||
|
||||
public InitMessagePostProcessor(Long initMessageTTL) {
|
||||
this.initMessageTTL = String.valueOf(initMessageTTL);
|
||||
}
|
||||
|
||||
public InitMessagePostProcessor(String initMessageTTL) {
|
||||
this.initMessageTTL = initMessageTTL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message postProcessMessage(Message message) throws AmqpException {
|
||||
// set init register expiration time
|
||||
MessageProperties messageProperties = message.getMessageProperties();
|
||||
messageProperties.setExpiration(initMessageTTL);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
package io.wdd.agent.initialization.rabbitmq;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.agent.message.handler.OctopusMessageHandler;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.QueueInformation;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Lazy
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class GenerateOctopusConnection {
|
||||
|
||||
private final List<MessageListenerContainer> messageListenerContainerList = new ArrayList<>();
|
||||
private final SimpleRabbitListenerContainerFactory containerFactory;
|
||||
@Resource
|
||||
RabbitAdmin rabbitAdmin;
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
@Resource
|
||||
OctopusMessageHandler octopusMessageHandler;
|
||||
|
||||
public void ManualGenerate(OctopusMessage octopusMessage) {
|
||||
|
||||
// generate the ne topic queue for unique agent
|
||||
String agentTopicName = octopusMessage.getUuid();
|
||||
|
||||
// reboot judgyment of existing exchange
|
||||
QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName);
|
||||
|
||||
if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) {
|
||||
log.info("Octopus Agent Specific Topic Queue Already Existed ! == {}", agentTopicName);
|
||||
return;
|
||||
}
|
||||
|
||||
Queue queue = new Queue(agentTopicName, true, false, false);
|
||||
Binding binding = new Binding(
|
||||
agentTopicName,
|
||||
Binding.DestinationType.QUEUE,
|
||||
octopusMessage.getContent().toString(),
|
||||
agentTopicName + "*",
|
||||
null
|
||||
);
|
||||
|
||||
// Exchange are created by Octopus Server at server BootUP
|
||||
rabbitAdmin.declareQueue(queue);
|
||||
rabbitAdmin.declareBinding(binding);
|
||||
|
||||
|
||||
// create the listener
|
||||
SimpleMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
|
||||
listenerContainer.addQueues(queue);
|
||||
listenerContainer.setMessageListener(this::AgentListenToSpecificTopicOctopusMessage);
|
||||
listenerContainer.start();
|
||||
|
||||
|
||||
log.info("Specific Octopus Topic Queue Generate Successfully !");
|
||||
messageListenerContainerList.add(listenerContainer);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Maunally Generated Octopus Message Listener
|
||||
*
|
||||
* @param message Rabbitmq Message
|
||||
*/
|
||||
public void AgentListenToSpecificTopicOctopusMessage(Message message) {
|
||||
|
||||
OctopusMessage octopusMessage;
|
||||
|
||||
|
||||
try {
|
||||
octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class);
|
||||
|
||||
if (!octopusMessageHandler.handle(octopusMessage)) {
|
||||
throw new MyRuntimeException("Octopus Message Handle Err");
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
|
||||
throw new MyRuntimeException("Octopus Message Wrong !");
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
messageListenerContainerList.forEach(Lifecycle::stop);
|
||||
log.info("- stop all message listeners...");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -9,12 +9,12 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* agent send server init info to octopus server
|
||||
* agent send server PassThroughTopicName info to octopus server
|
||||
* <p>
|
||||
* agent boot up should send message to server using this queue
|
||||
*/
|
||||
@Configuration
|
||||
public class InitialRabbitMqConnector {
|
||||
public class InitRabbitMQConnector {
|
||||
|
||||
@Value("${octopus.message.init_exchange}")
|
||||
public String INIT_EXCHANGE;
|
||||
@@ -31,6 +31,15 @@ public class InitialRabbitMqConnector {
|
||||
@Value("${octopus.message.init_to_server_key}")
|
||||
public String INIT_TO_SERVER_KEY;
|
||||
|
||||
@Value("${octopus.message.octopus_exchange}")
|
||||
public String OCTOPUS_EXCHANGE;
|
||||
|
||||
@Value("${octopus.message.octopus_to_server}")
|
||||
public String OCTOPUS_TO_SERVER;
|
||||
|
||||
//
|
||||
public static String SPECIFIC_AGENT_TOPIC_NAME;
|
||||
|
||||
@Bean
|
||||
public DirectExchange initDirectExchange() {
|
||||
return new DirectExchange(INIT_EXCHANGE);
|
||||
101
agent/src/main/java/io/wdd/agent/message/ToServerMessage.java
Normal file
101
agent/src/main/java/io/wdd/agent/message/ToServerMessage.java
Normal file
@@ -0,0 +1,101 @@
|
||||
package io.wdd.agent.message;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.agent.initialization.beans.AgentServerInfo;
|
||||
import io.wdd.agent.initialization.rabbitmq.InitRabbitMQConnector;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessagePostProcessor;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Service
|
||||
@Slf4j(topic = "To Octopus Server Message")
|
||||
public class ToServerMessage {
|
||||
|
||||
@Resource
|
||||
InitRabbitMQConnector initRabbitMqConnector;
|
||||
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
|
||||
public boolean send(OctopusMessage octopusMessage) {
|
||||
|
||||
octopusMessage.setAc_time(LocalDateTime.now());
|
||||
|
||||
// send to Queue -- InitToServer
|
||||
|
||||
log.info("send Message to Server = {}", octopusMessage);
|
||||
|
||||
try {
|
||||
|
||||
rabbitTemplate.convertAndSend(
|
||||
initRabbitMqConnector.OCTOPUS_EXCHANGE,
|
||||
initRabbitMqConnector.OCTOPUS_TO_SERVER,
|
||||
objectMapper.writeValueAsBytes(octopusMessage)
|
||||
);
|
||||
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
|
||||
log.error("Failed to send message to Serv er ! = {}", octopusMessage);
|
||||
throw new MyRuntimeException(e);
|
||||
}
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public void sendInitInfo(AgentServerInfo agentServerInfo, String defaultInitRegisterTimeOut) {
|
||||
|
||||
// set PassThroughTopicName agent register ttl
|
||||
InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut);
|
||||
|
||||
log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo);
|
||||
|
||||
// send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(initRabbitMqConnector.INIT_EXCHANGE, initRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsBytes(agentServerInfo), initMessagePostProcessor);
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to send INIT message to Server ! = {}", agentServerInfo);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class InitMessagePostProcessor implements MessagePostProcessor {
|
||||
|
||||
private final String initMessageTTL;
|
||||
|
||||
// public InitMessagePostProcessor(Long initMessageTTL) {
|
||||
// this.initMessageTTL = String.valueOf(initMessageTTL);
|
||||
// }
|
||||
|
||||
public InitMessagePostProcessor(String initMessageTTL) {
|
||||
this.initMessageTTL = initMessageTTL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message postProcessMessage(Message message) throws AmqpException {
|
||||
// set PassThroughTopicName register expiration time
|
||||
MessageProperties messageProperties = message.getMessageProperties();
|
||||
messageProperties.setExpiration(initMessageTTL);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,12 @@
|
||||
package io.wdd.agent.message.handler;
|
||||
|
||||
|
||||
import io.wdd.agent.config.rabbitmq.*;
|
||||
import io.wdd.agent.config.rabbitmq.handler.*;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
|
||||
@Service
|
||||
@@ -14,16 +15,28 @@ public class OctopusMessageHandler {
|
||||
private AbstractOctopusMessageHandler octopusMessageHandler;
|
||||
|
||||
|
||||
@Resource
|
||||
OMHandlerAgent omHandlerAgent;
|
||||
|
||||
@Resource
|
||||
OMHandlerExecutor omHandlerExecutor;
|
||||
|
||||
@Resource
|
||||
OMHandlerInit omHandlerInit;
|
||||
|
||||
@Resource
|
||||
OMHandlerStatus omHandlerStatus;
|
||||
|
||||
@PostConstruct
|
||||
private void registerAllHandler() {
|
||||
|
||||
AbstractOctopusMessageHandler.Builder builder = new AbstractOctopusMessageHandler.Builder();
|
||||
|
||||
octopusMessageHandler = builder
|
||||
.addHandler(new OMHandlerExecutor())
|
||||
.addHandler(new OMHandlerAgent())
|
||||
.addHandler(new OMHandlerStatus())
|
||||
.addHandler(new OMHandlerInit())
|
||||
.addHandler(omHandlerExecutor)
|
||||
.addHandler(omHandlerAgent)
|
||||
.addHandler(omHandlerStatus)
|
||||
.addHandler(omHandlerInit)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
server:
|
||||
port: 8000
|
||||
|
||||
|
||||
spring:
|
||||
main:
|
||||
allow-circular-references: true
|
||||
allow-bean-definition-overriding: true
|
||||
rabbitmq:
|
||||
host: 127.0.0.1
|
||||
port: 35672
|
||||
@@ -22,4 +26,12 @@ octopus:
|
||||
# agent boot up default common exchange routing key
|
||||
init_from_server_key: InitFromServerKey
|
||||
# initialization register time out (unit ms) default is 5 min
|
||||
init_ttl: "3000000"
|
||||
init_ttl: "3000000"
|
||||
# Octopus Exchange Name == server comunicate with agent
|
||||
octopus_exchange: OctopusExchange
|
||||
# Octopus Message To Server == all agent send info to server queue and topic
|
||||
octopus_to_server: OctopusToServer
|
||||
|
||||
logging:
|
||||
level:
|
||||
web: debug
|
||||
@@ -6,7 +6,7 @@ import com.rabbitmq.client.Channel;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import io.wdd.rpc.message.ToAgentOrder;
|
||||
import io.wdd.rpc.message.sender.ToAgentOrder;
|
||||
import io.wdd.server.beans.vo.ServerInfoVO;
|
||||
import io.wdd.server.utils.DaemonDatabaseOperator;
|
||||
import lombok.SneakyThrows;
|
||||
@@ -24,13 +24,17 @@ import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* The type Accept boot up info message.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j(topic = "octopus agent init ")
|
||||
public class AcceptBootUpInfoMessage {
|
||||
public class AcceptAgentInitInfo {
|
||||
|
||||
@Resource
|
||||
InitRabbitMQConfig initRabbitMQConfig;
|
||||
|
||||
|
||||
public static Set<String> ALL_SERVER_CITY_INFO = new HashSet<>(
|
||||
@@ -125,7 +129,8 @@ public class AcceptBootUpInfoMessage {
|
||||
// long deliveryTag, boolean requeue
|
||||
// channel.basicReject(deliveryTag,true);
|
||||
|
||||
Thread.sleep(1000); // 这里只是便于出现死循环时查看
|
||||
// 这里只是便于出现死循环时查看
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
|
||||
/*
|
||||
* 一般实际异常情况下的处理过程:记录出现异常的业务数据,将它单独插入到一个单独的模块,
|
||||
@@ -137,6 +142,7 @@ public class AcceptBootUpInfoMessage {
|
||||
}
|
||||
|
||||
/**
|
||||
* 无异常就确认消息
|
||||
* 无异常就确认消息
|
||||
* basicAck(long deliveryTag, boolean multiple)
|
||||
* deliveryTag:取出来当前消息在队列中的的索引;
|
||||
@@ -145,6 +151,7 @@ public class AcceptBootUpInfoMessage {
|
||||
*/
|
||||
// ack the rabbitmq info
|
||||
// If all logic is successful
|
||||
log.info("Agent [ {} ] has init successfully !", serverInfoVO.getTopicName());
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
|
||||
@@ -154,14 +161,15 @@ public class AcceptBootUpInfoMessage {
|
||||
filter(serverName -> agentQueueTopic.startsWith(serverName))
|
||||
.findFirst();
|
||||
|
||||
return first.isEmpty();
|
||||
return first.isPresent();
|
||||
}
|
||||
|
||||
private boolean sendInitMessageToAgent(ServerInfoVO serverInfoVO) {
|
||||
|
||||
OctopusMessage octopusMessage = OctopusMessage.builder()
|
||||
.type(OctopusMessageType.INIT)
|
||||
.content(serverInfoVO.getTopicName())
|
||||
// should be the OctopusExchange Name
|
||||
.content(String.valueOf(initRabbitMQConfig.OCTOPUS_EXCHANGE))
|
||||
.init_time(LocalDateTime.now())
|
||||
.uuid(serverInfoVO.getTopicName())
|
||||
.build();
|
||||
@@ -9,7 +9,7 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class FromServerMessageBinding {
|
||||
public class InitRabbitMQConfig {
|
||||
|
||||
@Value("${octopus.message.init_exchange}")
|
||||
public String INIT_EXCHANGE;
|
||||
@@ -26,6 +26,13 @@ public class FromServerMessageBinding {
|
||||
@Value("${octopus.message.init_to_server_key}")
|
||||
public String INIT_TO_SERVER_KEY;
|
||||
|
||||
@Value("${octopus.message.octopus_exchange}")
|
||||
public String OCTOPUS_EXCHANGE;
|
||||
|
||||
|
||||
@Value("${octopus.message.octopus_to_server}")
|
||||
public String OCTOPUS_TO_SERVER;
|
||||
|
||||
@Bean
|
||||
public DirectExchange initDirectExchange() {
|
||||
return new DirectExchange(INIT_EXCHANGE);
|
||||
@@ -40,7 +47,7 @@ public class FromServerMessageBinding {
|
||||
/**
|
||||
* 配置一个队列和交换机的绑定
|
||||
*
|
||||
* @param initDirectQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
|
||||
* @param initFromServerQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
|
||||
* @param initDirectExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
|
||||
* .with() 方法对应的RoutingKey
|
||||
* @return
|
||||
@@ -0,0 +1,43 @@
|
||||
package io.wdd.rpc.init;
|
||||
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* generate the OCTOPUS_EXCHANGE at the beginning
|
||||
*/
|
||||
@Configuration
|
||||
public class OctopusExchangeConfig {
|
||||
|
||||
@Value("${octopus.message.octopus_exchange}")
|
||||
public String OCTOPUS_EXCHANGE;
|
||||
|
||||
@Value("${octopus.message.octopus_to_server}")
|
||||
public String OCTOPUS_TO_SERVER;
|
||||
|
||||
|
||||
@Bean
|
||||
public TopicExchange octopusExchange(){
|
||||
return new TopicExchange(OCTOPUS_EXCHANGE,true,false);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue octopusAgentToServerQueue(){
|
||||
return new Queue(OCTOPUS_TO_SERVER);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindingToServerTopicQueue(TopicExchange octopusExchange, Queue octopusAgentToServerQueue){
|
||||
return BindingBuilder
|
||||
.bind(octopusAgentToServerQueue)
|
||||
.to(octopusExchange)
|
||||
.with(OCTOPUS_TO_SERVER);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package io.wdd.rpc.message.handler;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
|
||||
@Configuration
|
||||
@Slf4j(topic = "Octopus Message Handler")
|
||||
public class OctopusMessageHandlerServer {
|
||||
|
||||
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
@RabbitHandler
|
||||
@RabbitListener(queues = "${octopus.message.octopus_to_server}"
|
||||
)
|
||||
public void HandleOctopusMessageFromAgent(Message message){
|
||||
|
||||
OctopusMessage octopusMessage;
|
||||
|
||||
try {
|
||||
octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class);
|
||||
} catch (IOException e) {
|
||||
throw new MyRuntimeException("Octopus Message Wrong !");
|
||||
}
|
||||
|
||||
// Octopus Message Handler
|
||||
log.info("received from agent : {} ", octopusMessage);
|
||||
|
||||
|
||||
// todo what to do after received the result
|
||||
|
||||
// log info ?
|
||||
// update the database
|
||||
// handle the result
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.wdd.rpc.message;
|
||||
package io.wdd.rpc.message.sender;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import io.wdd.rpc.init.FromServerMessageBinding;
|
||||
import io.wdd.rpc.init.InitRabbitMQConfig;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
@@ -18,14 +18,14 @@ import javax.annotation.Resource;
|
||||
* provide override method to convert Object and send to rabbitmq
|
||||
*/
|
||||
@Component
|
||||
@Slf4j(topic = "order to agent ")
|
||||
@Slf4j(topic = "To Octopus Server Message ")
|
||||
public class ToAgentOrder {
|
||||
|
||||
@Resource
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Resource
|
||||
FromServerMessageBinding fromServerMessageBinding;
|
||||
InitRabbitMQConfig initRabbitMQConfig;
|
||||
|
||||
|
||||
@Resource
|
||||
@@ -47,7 +47,7 @@ public class ToAgentOrder {
|
||||
// send to Queue -- InitFromServer
|
||||
log.info("send INIT OrderCommand to Agent = {}", message);
|
||||
|
||||
rabbitTemplate.convertAndSend(fromServerMessageBinding.INIT_EXCHANGE, fromServerMessageBinding.INIT_FROM_SERVER_KEY, writeData(message));
|
||||
rabbitTemplate.convertAndSend(initRabbitMQConfig.INIT_EXCHANGE, initRabbitMQConfig.INIT_FROM_SERVER_KEY, writeData(message));
|
||||
|
||||
}
|
||||
|
||||
@@ -74,8 +74,8 @@ public class CoreServerServiceImpl implements CoreServerService {
|
||||
@Override
|
||||
public boolean serverCreate(ServerInfoVO serverInfoVO) {
|
||||
|
||||
ServerInfoPO serverInfoPO = new ServerInfoPO();
|
||||
BeanUtils.copyProperties(serverInfoVO, serverInfoPO);
|
||||
// BeanUtils.copyProperties(serverInfoVO, serverInfoPO);
|
||||
ServerInfoPO serverInfoPO = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class);
|
||||
|
||||
return serverInfoService.save(serverInfoPO);
|
||||
}
|
||||
|
||||
@@ -20,8 +20,10 @@ public class MyBatisAutoInsertInterceptor implements MetaObjectHandler {
|
||||
public void insertFill(MetaObject metaObject) {
|
||||
log.info("MyBaitsPlus start to insert manually !");
|
||||
|
||||
this.strictInsertFill(metaObject, "registerTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
this.strictInsertFill(metaObject, "create_time", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
this.strictInsertFill(metaObject, "createTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
|
||||
this.strictInsertFill(metaObject, "updateTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -30,7 +32,5 @@ public class MyBatisAutoInsertInterceptor implements MetaObjectHandler {
|
||||
log.info("MyBaitsPlus start to update manually !");
|
||||
|
||||
this.strictInsertFill(metaObject, "updateTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
|
||||
this.strictInsertFill(metaObject, "update_time", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,10 +39,10 @@ public class DaemonDatabaseOperator {
|
||||
*/
|
||||
public boolean saveInitOctopusAgentInfo(ServerInfoVO serverInfoVO) {
|
||||
|
||||
log.info("simulate store the Octopus Agent Server info");
|
||||
// log.info("simulate store the Octopus Agent Server info");
|
||||
|
||||
return true;
|
||||
// return coreServerService.serverCreate(serverInfoVO);
|
||||
// return true;
|
||||
return coreServerService.serverCreate(serverInfoVO);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,11 @@ octopus:
|
||||
init_from_server_key: InitFromServerKey
|
||||
# initialization register time out (unit ms) default is 5 min
|
||||
init_ttl: "300000"
|
||||
# Octopus Exchange Name == server comunicate with agent
|
||||
octopus_exchange: OctopusExchange
|
||||
# Octopus Message To Server == all agent send info to server queue and topic
|
||||
octopus_to_server: OctopusToServer
|
||||
|
||||
|
||||
spring:
|
||||
rabbitmq:
|
||||
|
||||
Reference in New Issue
Block a user