diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerInit.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerInit.java deleted file mode 100644 index c3c494a..0000000 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerInit.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.wdd.agent.config.rabbitmq; - -import io.wdd.common.beans.rabbitmq.OctopusMessage; -import io.wdd.common.beans.rabbitmq.OctopusMessageType; - - - -public class OMHandlerInit extends AbstractOctopusMessageHandler { - - @Override - public boolean handle(OctopusMessage octopusMessage) { - if (!octopusMessage.getType().equals(OctopusMessageType.INIT)) { - this.getNextHandler().handle(octopusMessage); - } - - // handle the init message - - return false; - } - - -} diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/config/OctopusRabbitMQAdminConfig.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/config/OctopusRabbitMQAdminConfig.java new file mode 100644 index 0000000..1494f92 --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/config/OctopusRabbitMQAdminConfig.java @@ -0,0 +1,20 @@ +package io.wdd.agent.config.rabbitmq.config; + +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class OctopusRabbitMQAdminConfig { + + @Autowired + ConnectionFactory connectionFactory; + + @Bean + public RabbitAdmin rabbitAdmin(){ + + return new RabbitAdmin(connectionFactory); + } +} diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/AbstractOctopusMessageHandler.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/AbstractOctopusMessageHandler.java similarity index 87% rename from agent/src/main/java/io/wdd/agent/config/rabbitmq/AbstractOctopusMessageHandler.java rename to agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/AbstractOctopusMessageHandler.java index 0f0e1f0..7c639fe 100644 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/AbstractOctopusMessageHandler.java +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/AbstractOctopusMessageHandler.java @@ -1,7 +1,8 @@ -package io.wdd.agent.config.rabbitmq; +package io.wdd.agent.config.rabbitmq.handler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; /** @@ -10,10 +11,11 @@ import org.springframework.context.annotation.Configuration; @Configuration public abstract class AbstractOctopusMessageHandler { - private AbstractOctopusMessageHandler next; + protected AbstractOctopusMessageHandler next; public void addHandler(AbstractOctopusMessageHandler handler) { this.next = handler; + } public AbstractOctopusMessageHandler getNextHandler() { diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerAgent.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerAgent.java similarity index 66% rename from agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerAgent.java rename to agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerAgent.java index 6804594..3547c9c 100644 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerAgent.java +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerAgent.java @@ -1,14 +1,17 @@ -package io.wdd.agent.config.rabbitmq; +package io.wdd.agent.config.rabbitmq.handler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import org.springframework.stereotype.Component; +@Component public class OMHandlerAgent extends AbstractOctopusMessageHandler { @Override public boolean handle(OctopusMessage octopusMessage) { if (!octopusMessage.getType().equals(OctopusMessageType.AGENT)) { - this.getNextHandler().handle(octopusMessage); + return next.handle(octopusMessage); } - return false; + + return true; } } diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerBlackHole.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerBlackHole.java similarity index 88% rename from agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerBlackHole.java rename to agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerBlackHole.java index 194d712..8e200a0 100644 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerBlackHole.java +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerBlackHole.java @@ -1,4 +1,4 @@ -package io.wdd.agent.config.rabbitmq; +package io.wdd.agent.config.rabbitmq.handler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import lombok.extern.slf4j.Slf4j; diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerExecutor.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerExecutor.java similarity index 66% rename from agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerExecutor.java rename to agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerExecutor.java index fde4501..1c4f32f 100644 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerExecutor.java +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerExecutor.java @@ -1,15 +1,17 @@ -package io.wdd.agent.config.rabbitmq; +package io.wdd.agent.config.rabbitmq.handler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import org.springframework.stereotype.Component; +@Component public class OMHandlerExecutor extends AbstractOctopusMessageHandler { @Override public boolean handle(OctopusMessage octopusMessage) { if (!octopusMessage.getType().equals(OctopusMessageType.EXECUTOR)) { - this.getNextHandler().handle(octopusMessage); + return next.handle(octopusMessage); } - return false; + return true; } } diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerInit.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerInit.java new file mode 100644 index 0000000..402aab2 --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerInit.java @@ -0,0 +1,58 @@ +package io.wdd.agent.config.rabbitmq.handler; + +import io.wdd.agent.initialization.beans.AgentServerInfo; +import io.wdd.agent.initialization.rabbitmq.GenerateOctopusConnection; +import io.wdd.agent.message.ToServerMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + + +/** + * handle the agent PassThroughTopicName info + * 1. generator the unique topic queue for agent itself + * 2. send PassThroughTopicName successful info to the server + */ +@Lazy +@Component +@Slf4j +public class OMHandlerInit extends AbstractOctopusMessageHandler { + + @Resource + GenerateOctopusConnection generateOctopusConnection; + + @Resource + ToServerMessage toServerMessage; + + @Resource + AgentServerInfo agentServerInfo; + + @Override + public boolean handle(OctopusMessage octopusMessage) { + if (!octopusMessage.getType().equals(OctopusMessageType.INIT)) { + return next.handle(octopusMessage); + } + + // handle the PassThroughTopicName message + // 1. generator the unique topic queue for agent itself + // 1.1 initial the specific topic queue listener + generateOctopusConnection.ManualGenerate(octopusMessage); + + + // 2. send PassThroughTopicName successful info to the server + String success = String.format("Octopus Agent [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo, octopusMessage); + + octopusMessage.setResult(success); + log.info(success); + + toServerMessage.send(octopusMessage); + + return true; + } + + +} diff --git a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerStatus.java b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerStatus.java similarity index 66% rename from agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerStatus.java rename to agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerStatus.java index c5b4a3f..8999e5c 100644 --- a/agent/src/main/java/io/wdd/agent/config/rabbitmq/OMHandlerStatus.java +++ b/agent/src/main/java/io/wdd/agent/config/rabbitmq/handler/OMHandlerStatus.java @@ -1,16 +1,18 @@ -package io.wdd.agent.config.rabbitmq; +package io.wdd.agent.config.rabbitmq.handler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; +import org.springframework.stereotype.Component; +@Component public class OMHandlerStatus extends AbstractOctopusMessageHandler { @Override public boolean handle(OctopusMessage octopusMessage) { if (!octopusMessage.getType().equals(OctopusMessageType.STATUS)) { - this.getNextHandler().handle(octopusMessage); + return next.handle(octopusMessage); } - return false; + return true; } } diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java b/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java index ea762c0..f2636d4 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/CollectSystemInfo.java @@ -100,7 +100,7 @@ public class CollectSystemInfo implements ApplicationContextAware { // start to send message to Octopus Server octopusAgentInitService.SendInfoToServer(agentServerInfo); - log.info("init server info has been send to octopus server !"); + log.info("PassThroughTopicName server info has been send to octopus server !"); } diff --git a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java index a2cb34a..8681352 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java +++ b/agent/src/main/java/io/wdd/agent/initialization/bootup/OctopusAgentInitService.java @@ -3,18 +3,14 @@ package io.wdd.agent.initialization.bootup; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import io.wdd.agent.initialization.beans.AgentServerInfo; -import io.wdd.agent.initialization.rabbitmq.InitialRabbitMqConnector; +import io.wdd.agent.message.ToServerMessage; import io.wdd.agent.message.handler.OctopusMessageHandler; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.handler.MyRuntimeException; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessagePostProcessor; -import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.*; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -23,44 +19,36 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; @Service @Lazy @Slf4j public class OctopusAgentInitService { - @Resource - RabbitTemplate rabbitTemplate; - - @Resource - InitialRabbitMqConnector initialRabbitMqConnector; - - @Autowired - ObjectMapper objectMapper; + ToServerMessage toServerMessage; @Autowired OctopusMessageHandler octopusMessageHandler; + @Resource + AgentServerInfo agentServerInfo; @Value("${octopus.message.init_ttl}") String defaultInitRegisterTimeOut; - @SneakyThrows + @Resource + ObjectMapper objectMapper; + public void SendInfoToServer(AgentServerInfo agentServerInfo) { - // set init agent register ttl - InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut); - - log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo); - - // send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server - rabbitTemplate.convertAndSend(initialRabbitMqConnector.INIT_EXCHANGE, initialRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsString(agentServerInfo), initMessagePostProcessor); + toServerMessage.sendInitInfo(agentServerInfo, defaultInitRegisterTimeOut); } /** - * listen to the init queue from octopus server + * listen to the PassThroughTopicName queue from octopus server * * @param message 该方法不需要手动调用,Spring会自动运行这个监听方法 *

@@ -82,11 +70,17 @@ public class OctopusAgentInitService { ) public void ReceiveInitInfoFromServer(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { - try { OctopusMessage octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); + // consider the multi-agents register situation + // judge the machineID begin + String[] split = octopusMessage.getUuid().split("-"); + if (!agentServerInfo.getMachineId().startsWith(split[split.length - 1])) { + throw new MyRuntimeException("INIT Message not for this agent !"); + } + // response chain to handle all kind of type of octopus message if (!octopusMessageHandler.handle(octopusMessage)) { throw new MyRuntimeException(" Handle Octopus Message Error !"); @@ -99,8 +93,8 @@ public class OctopusAgentInitService { // long deliveryTag, boolean requeue // channel.basicReject(deliveryTag,true); - Thread.sleep(1000); // 这里只是便于出现死循环时查看 - + // 这里只是便于出现死循环时查看 + TimeUnit.SECONDS.sleep(5); throw new MyRuntimeException("Octopus Agent Initialization Error, please check !"); } @@ -109,25 +103,5 @@ public class OctopusAgentInitService { channel.basicAck(deliveryTag, false); } - private class InitMessagePostProcessor implements MessagePostProcessor { - - private final String initMessageTTL; - - public InitMessagePostProcessor(Long initMessageTTL) { - this.initMessageTTL = String.valueOf(initMessageTTL); - } - - public InitMessagePostProcessor(String initMessageTTL) { - this.initMessageTTL = initMessageTTL; - } - - @Override - public Message postProcessMessage(Message message) throws AmqpException { - // set init register expiration time - MessageProperties messageProperties = message.getMessageProperties(); - messageProperties.setExpiration(initMessageTTL); - return message; - } - } } diff --git a/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/GenerateOctopusConnection.java b/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/GenerateOctopusConnection.java new file mode 100644 index 0000000..a87a38a --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/GenerateOctopusConnection.java @@ -0,0 +1,113 @@ +package io.wdd.agent.initialization.rabbitmq; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.agent.message.handler.OctopusMessageHandler; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.handler.MyRuntimeException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueInformation; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Lazy +@Component +@Slf4j +@RequiredArgsConstructor +public class GenerateOctopusConnection { + + private final List messageListenerContainerList = new ArrayList<>(); + private final SimpleRabbitListenerContainerFactory containerFactory; + @Resource + RabbitAdmin rabbitAdmin; + @Resource + ObjectMapper objectMapper; + @Resource + OctopusMessageHandler octopusMessageHandler; + + public void ManualGenerate(OctopusMessage octopusMessage) { + + // generate the ne topic queue for unique agent + String agentTopicName = octopusMessage.getUuid(); + + // reboot judgyment of existing exchange + QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName); + + if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) { + log.info("Octopus Agent Specific Topic Queue Already Existed ! == {}", agentTopicName); + return; + } + + Queue queue = new Queue(agentTopicName, true, false, false); + Binding binding = new Binding( + agentTopicName, + Binding.DestinationType.QUEUE, + octopusMessage.getContent().toString(), + agentTopicName + "*", + null + ); + + // Exchange are created by Octopus Server at server BootUP + rabbitAdmin.declareQueue(queue); + rabbitAdmin.declareBinding(binding); + + + // create the listener + SimpleMessageListenerContainer listenerContainer = containerFactory.createListenerContainer(); + listenerContainer.addQueues(queue); + listenerContainer.setMessageListener(this::AgentListenToSpecificTopicOctopusMessage); + listenerContainer.start(); + + + log.info("Specific Octopus Topic Queue Generate Successfully !"); + messageListenerContainerList.add(listenerContainer); + } + + + /** + * Maunally Generated Octopus Message Listener + * + * @param message Rabbitmq Message + */ + public void AgentListenToSpecificTopicOctopusMessage(Message message) { + + OctopusMessage octopusMessage; + + + try { + octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); + + if (!octopusMessageHandler.handle(octopusMessage)) { + throw new MyRuntimeException("Octopus Message Handle Err"); + } + + } catch (IOException e) { + + throw new MyRuntimeException("Octopus Message Wrong !"); + } + } + + @PreDestroy + public void destroy() { + messageListenerContainerList.forEach(Lifecycle::stop); + log.info("- stop all message listeners..."); + } + + +} diff --git a/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitialRabbitMqConnector.java b/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitRabbitMQConnector.java similarity index 84% rename from agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitialRabbitMqConnector.java rename to agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitRabbitMQConnector.java index 05a96ad..2d1aa04 100644 --- a/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitialRabbitMqConnector.java +++ b/agent/src/main/java/io/wdd/agent/initialization/rabbitmq/InitRabbitMQConnector.java @@ -9,12 +9,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * agent send server init info to octopus server + * agent send server PassThroughTopicName info to octopus server *

* agent boot up should send message to server using this queue */ @Configuration -public class InitialRabbitMqConnector { +public class InitRabbitMQConnector { @Value("${octopus.message.init_exchange}") public String INIT_EXCHANGE; @@ -31,6 +31,15 @@ public class InitialRabbitMqConnector { @Value("${octopus.message.init_to_server_key}") public String INIT_TO_SERVER_KEY; + @Value("${octopus.message.octopus_exchange}") + public String OCTOPUS_EXCHANGE; + + @Value("${octopus.message.octopus_to_server}") + public String OCTOPUS_TO_SERVER; + + // + public static String SPECIFIC_AGENT_TOPIC_NAME; + @Bean public DirectExchange initDirectExchange() { return new DirectExchange(INIT_EXCHANGE); diff --git a/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java b/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java new file mode 100644 index 0000000..6695677 --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/message/ToServerMessage.java @@ -0,0 +1,101 @@ +package io.wdd.agent.message; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.agent.initialization.beans.AgentServerInfo; +import io.wdd.agent.initialization.rabbitmq.InitRabbitMQConnector; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.handler.MyRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.time.LocalDateTime; + +@Service +@Slf4j(topic = "To Octopus Server Message") +public class ToServerMessage { + + @Resource + InitRabbitMQConnector initRabbitMqConnector; + + @Resource + ObjectMapper objectMapper; + + @Resource + RabbitTemplate rabbitTemplate; + + + + public boolean send(OctopusMessage octopusMessage) { + + octopusMessage.setAc_time(LocalDateTime.now()); + + // send to Queue -- InitToServer + + log.info("send Message to Server = {}", octopusMessage); + + try { + + rabbitTemplate.convertAndSend( + initRabbitMqConnector.OCTOPUS_EXCHANGE, + initRabbitMqConnector.OCTOPUS_TO_SERVER, + objectMapper.writeValueAsBytes(octopusMessage) + ); + + + } catch (JsonProcessingException e) { + + log.error("Failed to send message to Serv er ! = {}", octopusMessage); + throw new MyRuntimeException(e); + } + + + return true; + } + + + public void sendInitInfo(AgentServerInfo agentServerInfo, String defaultInitRegisterTimeOut) { + + // set PassThroughTopicName agent register ttl + InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut); + + log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo); + + // send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server + try { + rabbitTemplate.convertAndSend(initRabbitMqConnector.INIT_EXCHANGE, initRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsBytes(agentServerInfo), initMessagePostProcessor); + } catch (JsonProcessingException e) { + log.error("Failed to send INIT message to Server ! = {}", agentServerInfo); + throw new RuntimeException(e); + } + + } + + private static class InitMessagePostProcessor implements MessagePostProcessor { + + private final String initMessageTTL; + +// public InitMessagePostProcessor(Long initMessageTTL) { +// this.initMessageTTL = String.valueOf(initMessageTTL); +// } + + public InitMessagePostProcessor(String initMessageTTL) { + this.initMessageTTL = initMessageTTL; + } + + @Override + public Message postProcessMessage(Message message) throws AmqpException { + // set PassThroughTopicName register expiration time + MessageProperties messageProperties = message.getMessageProperties(); + messageProperties.setExpiration(initMessageTTL); + return message; + } + } +} diff --git a/agent/src/main/java/io/wdd/agent/message/handler/OctopusMessageHandler.java b/agent/src/main/java/io/wdd/agent/message/handler/OctopusMessageHandler.java index 89a3c2e..16ca279 100644 --- a/agent/src/main/java/io/wdd/agent/message/handler/OctopusMessageHandler.java +++ b/agent/src/main/java/io/wdd/agent/message/handler/OctopusMessageHandler.java @@ -1,11 +1,12 @@ package io.wdd.agent.message.handler; -import io.wdd.agent.config.rabbitmq.*; +import io.wdd.agent.config.rabbitmq.handler.*; import io.wdd.common.beans.rabbitmq.OctopusMessage; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import javax.annotation.Resource; @Service @@ -14,16 +15,28 @@ public class OctopusMessageHandler { private AbstractOctopusMessageHandler octopusMessageHandler; + @Resource + OMHandlerAgent omHandlerAgent; + + @Resource + OMHandlerExecutor omHandlerExecutor; + + @Resource + OMHandlerInit omHandlerInit; + + @Resource + OMHandlerStatus omHandlerStatus; + @PostConstruct private void registerAllHandler() { AbstractOctopusMessageHandler.Builder builder = new AbstractOctopusMessageHandler.Builder(); octopusMessageHandler = builder - .addHandler(new OMHandlerExecutor()) - .addHandler(new OMHandlerAgent()) - .addHandler(new OMHandlerStatus()) - .addHandler(new OMHandlerInit()) + .addHandler(omHandlerExecutor) + .addHandler(omHandlerAgent) + .addHandler(omHandlerStatus) + .addHandler(omHandlerInit) .build(); } diff --git a/agent/src/main/resources/application.yml b/agent/src/main/resources/application.yml index abd2979..14c8c79 100644 --- a/agent/src/main/resources/application.yml +++ b/agent/src/main/resources/application.yml @@ -1,7 +1,11 @@ server: port: 8000 + spring: + main: + allow-circular-references: true + allow-bean-definition-overriding: true rabbitmq: host: 127.0.0.1 port: 35672 @@ -22,4 +26,12 @@ octopus: # agent boot up default common exchange routing key init_from_server_key: InitFromServerKey # initialization register time out (unit ms) default is 5 min - init_ttl: "3000000" \ No newline at end of file + init_ttl: "3000000" + # Octopus Exchange Name == server comunicate with agent + octopus_exchange: OctopusExchange + # Octopus Message To Server == all agent send info to server queue and topic + octopus_to_server: OctopusToServer + +logging: + level: + web: debug \ No newline at end of file diff --git a/server/src/main/java/io/wdd/rpc/init/AcceptBootUpInfoMessage.java b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java similarity index 92% rename from server/src/main/java/io/wdd/rpc/init/AcceptBootUpInfoMessage.java rename to server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java index f463a12..372d063 100644 --- a/server/src/main/java/io/wdd/rpc/init/AcceptBootUpInfoMessage.java +++ b/server/src/main/java/io/wdd/rpc/init/AcceptAgentInitInfo.java @@ -6,7 +6,7 @@ import com.rabbitmq.client.Channel; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.handler.MyRuntimeException; -import io.wdd.rpc.message.ToAgentOrder; +import io.wdd.rpc.message.sender.ToAgentOrder; import io.wdd.server.beans.vo.ServerInfoVO; import io.wdd.server.utils.DaemonDatabaseOperator; import lombok.SneakyThrows; @@ -24,13 +24,17 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * The type Accept boot up info message. */ @Service @Slf4j(topic = "octopus agent init ") -public class AcceptBootUpInfoMessage { +public class AcceptAgentInitInfo { + + @Resource + InitRabbitMQConfig initRabbitMQConfig; public static Set ALL_SERVER_CITY_INFO = new HashSet<>( @@ -125,7 +129,8 @@ public class AcceptBootUpInfoMessage { // long deliveryTag, boolean requeue // channel.basicReject(deliveryTag,true); - Thread.sleep(1000); // 这里只是便于出现死循环时查看 + // 这里只是便于出现死循环时查看 + TimeUnit.SECONDS.sleep(5); /* * 一般实际异常情况下的处理过程:记录出现异常的业务数据,将它单独插入到一个单独的模块, @@ -137,6 +142,7 @@ public class AcceptBootUpInfoMessage { } /** + * 无异常就确认消息 * 无异常就确认消息 * basicAck(long deliveryTag, boolean multiple) * deliveryTag:取出来当前消息在队列中的的索引; @@ -145,6 +151,7 @@ public class AcceptBootUpInfoMessage { */ // ack the rabbitmq info // If all logic is successful + log.info("Agent [ {} ] has init successfully !", serverInfoVO.getTopicName()); channel.basicAck(deliveryTag, false); } @@ -154,14 +161,15 @@ public class AcceptBootUpInfoMessage { filter(serverName -> agentQueueTopic.startsWith(serverName)) .findFirst(); - return first.isEmpty(); + return first.isPresent(); } private boolean sendInitMessageToAgent(ServerInfoVO serverInfoVO) { OctopusMessage octopusMessage = OctopusMessage.builder() .type(OctopusMessageType.INIT) - .content(serverInfoVO.getTopicName()) + // should be the OctopusExchange Name + .content(String.valueOf(initRabbitMQConfig.OCTOPUS_EXCHANGE)) .init_time(LocalDateTime.now()) .uuid(serverInfoVO.getTopicName()) .build(); diff --git a/server/src/main/java/io/wdd/rpc/init/FromServerMessageBinding.java b/server/src/main/java/io/wdd/rpc/init/InitRabbitMQConfig.java similarity index 80% rename from server/src/main/java/io/wdd/rpc/init/FromServerMessageBinding.java rename to server/src/main/java/io/wdd/rpc/init/InitRabbitMQConfig.java index 949c3ba..b851b6e 100644 --- a/server/src/main/java/io/wdd/rpc/init/FromServerMessageBinding.java +++ b/server/src/main/java/io/wdd/rpc/init/InitRabbitMQConfig.java @@ -9,7 +9,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration -public class FromServerMessageBinding { +public class InitRabbitMQConfig { @Value("${octopus.message.init_exchange}") public String INIT_EXCHANGE; @@ -26,6 +26,13 @@ public class FromServerMessageBinding { @Value("${octopus.message.init_to_server_key}") public String INIT_TO_SERVER_KEY; + @Value("${octopus.message.octopus_exchange}") + public String OCTOPUS_EXCHANGE; + + + @Value("${octopus.message.octopus_to_server}") + public String OCTOPUS_TO_SERVER; + @Bean public DirectExchange initDirectExchange() { return new DirectExchange(INIT_EXCHANGE); @@ -40,7 +47,7 @@ public class FromServerMessageBinding { /** * 配置一个队列和交换机的绑定 * - * @param initDirectQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind() + * @param initFromServerQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind() * @param initDirectExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to() * .with() 方法对应的RoutingKey * @return diff --git a/server/src/main/java/io/wdd/rpc/init/OctopusExchangeConfig.java b/server/src/main/java/io/wdd/rpc/init/OctopusExchangeConfig.java new file mode 100644 index 0000000..4536b63 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/init/OctopusExchangeConfig.java @@ -0,0 +1,43 @@ +package io.wdd.rpc.init; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * generate the OCTOPUS_EXCHANGE at the beginning + */ +@Configuration +public class OctopusExchangeConfig { + + @Value("${octopus.message.octopus_exchange}") + public String OCTOPUS_EXCHANGE; + + @Value("${octopus.message.octopus_to_server}") + public String OCTOPUS_TO_SERVER; + + + @Bean + public TopicExchange octopusExchange(){ + return new TopicExchange(OCTOPUS_EXCHANGE,true,false); + } + + @Bean + public Queue octopusAgentToServerQueue(){ + return new Queue(OCTOPUS_TO_SERVER); + } + + @Bean + public Binding bindingToServerTopicQueue(TopicExchange octopusExchange, Queue octopusAgentToServerQueue){ + return BindingBuilder + .bind(octopusAgentToServerQueue) + .to(octopusExchange) + .with(OCTOPUS_TO_SERVER); + } + +} diff --git a/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java new file mode 100644 index 0000000..affac43 --- /dev/null +++ b/server/src/main/java/io/wdd/rpc/message/handler/OctopusMessageHandlerServer.java @@ -0,0 +1,47 @@ +package io.wdd.rpc.message.handler; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.wdd.common.beans.rabbitmq.OctopusMessage; +import io.wdd.common.handler.MyRuntimeException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.Resource; +import java.io.IOException; + +@Configuration +@Slf4j(topic = "Octopus Message Handler") +public class OctopusMessageHandlerServer { + + + @Resource + ObjectMapper objectMapper; + + @RabbitHandler + @RabbitListener(queues = "${octopus.message.octopus_to_server}" + ) + public void HandleOctopusMessageFromAgent(Message message){ + + OctopusMessage octopusMessage; + + try { + octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class); + } catch (IOException e) { + throw new MyRuntimeException("Octopus Message Wrong !"); + } + + // Octopus Message Handler + log.info("received from agent : {} ", octopusMessage); + + + // todo what to do after received the result + + // log info ? + // update the database + // handle the result + } +} diff --git a/server/src/main/java/io/wdd/rpc/message/ToAgentOrder.java b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentOrder.java similarity index 80% rename from server/src/main/java/io/wdd/rpc/message/ToAgentOrder.java rename to server/src/main/java/io/wdd/rpc/message/sender/ToAgentOrder.java index 4369683..8cd6085 100644 --- a/server/src/main/java/io/wdd/rpc/message/ToAgentOrder.java +++ b/server/src/main/java/io/wdd/rpc/message/sender/ToAgentOrder.java @@ -1,11 +1,11 @@ -package io.wdd.rpc.message; +package io.wdd.rpc.message.sender; import com.fasterxml.jackson.databind.ObjectMapper; import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.rabbitmq.OctopusMessageType; import io.wdd.common.handler.MyRuntimeException; -import io.wdd.rpc.init.FromServerMessageBinding; +import io.wdd.rpc.init.InitRabbitMQConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -18,14 +18,14 @@ import javax.annotation.Resource; * provide override method to convert Object and send to rabbitmq */ @Component -@Slf4j(topic = "order to agent ") +@Slf4j(topic = "To Octopus Server Message ") public class ToAgentOrder { @Resource RabbitTemplate rabbitTemplate; @Resource - FromServerMessageBinding fromServerMessageBinding; + InitRabbitMQConfig initRabbitMQConfig; @Resource @@ -47,7 +47,7 @@ public class ToAgentOrder { // send to Queue -- InitFromServer log.info("send INIT OrderCommand to Agent = {}", message); - rabbitTemplate.convertAndSend(fromServerMessageBinding.INIT_EXCHANGE, fromServerMessageBinding.INIT_FROM_SERVER_KEY, writeData(message)); + rabbitTemplate.convertAndSend(initRabbitMQConfig.INIT_EXCHANGE, initRabbitMQConfig.INIT_FROM_SERVER_KEY, writeData(message)); } diff --git a/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java b/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java index d926137..b743f4b 100644 --- a/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java +++ b/server/src/main/java/io/wdd/server/coreService/impl/CoreServerServiceImpl.java @@ -74,8 +74,8 @@ public class CoreServerServiceImpl implements CoreServerService { @Override public boolean serverCreate(ServerInfoVO serverInfoVO) { - ServerInfoPO serverInfoPO = new ServerInfoPO(); - BeanUtils.copyProperties(serverInfoVO, serverInfoPO); +// BeanUtils.copyProperties(serverInfoVO, serverInfoPO); + ServerInfoPO serverInfoPO = EntityUtils.cvToTarget(serverInfoVO, ServerInfoPO.class); return serverInfoService.save(serverInfoPO); } diff --git a/server/src/main/java/io/wdd/server/handler/MyBatisAutoInsertInterceptor.java b/server/src/main/java/io/wdd/server/handler/MyBatisAutoInsertInterceptor.java index 91e3b87..4fc69ac 100644 --- a/server/src/main/java/io/wdd/server/handler/MyBatisAutoInsertInterceptor.java +++ b/server/src/main/java/io/wdd/server/handler/MyBatisAutoInsertInterceptor.java @@ -20,8 +20,10 @@ public class MyBatisAutoInsertInterceptor implements MetaObjectHandler { public void insertFill(MetaObject metaObject) { log.info("MyBaitsPlus start to insert manually !"); - this.strictInsertFill(metaObject, "registerTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) - this.strictInsertFill(metaObject, "create_time", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) + this.strictInsertFill(metaObject, "createTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) + + this.strictInsertFill(metaObject, "updateTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) + } @Override @@ -30,7 +32,5 @@ public class MyBatisAutoInsertInterceptor implements MetaObjectHandler { log.info("MyBaitsPlus start to update manually !"); this.strictInsertFill(metaObject, "updateTime", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) - - this.strictInsertFill(metaObject, "update_time", () -> LocalDateTime.now(), LocalDateTime.class); // 起始版本 3.3.3(推荐) } } diff --git a/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java b/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java index 94d1918..af097ff 100644 --- a/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java +++ b/server/src/main/java/io/wdd/server/utils/DaemonDatabaseOperator.java @@ -39,10 +39,10 @@ public class DaemonDatabaseOperator { */ public boolean saveInitOctopusAgentInfo(ServerInfoVO serverInfoVO) { - log.info("simulate store the Octopus Agent Server info"); +// log.info("simulate store the Octopus Agent Server info"); - return true; -// return coreServerService.serverCreate(serverInfoVO); +// return true; + return coreServerService.serverCreate(serverInfoVO); } diff --git a/server/src/main/resources/application.yml b/server/src/main/resources/application.yml index 4027d76..5f56b98 100644 --- a/server/src/main/resources/application.yml +++ b/server/src/main/resources/application.yml @@ -15,6 +15,11 @@ octopus: init_from_server_key: InitFromServerKey # initialization register time out (unit ms) default is 5 min init_ttl: "300000" + # Octopus Exchange Name == server comunicate with agent + octopus_exchange: OctopusExchange + # Octopus Message To Server == all agent send info to server queue and topic + octopus_to_server: OctopusToServer + spring: rabbitmq: