[agent] start rabbitmq , accomplish init queue procedure
This commit is contained in:
@@ -4,5 +4,5 @@ import org.springframework.context.annotation.Configuration;
|
||||
|
||||
|
||||
@Configuration
|
||||
public class MyRabbitMqConfig {
|
||||
public class RuntimeMessageConfig {
|
||||
}
|
||||
@@ -2,30 +2,38 @@ package io.wdd.agent.initial.bootup;
|
||||
|
||||
|
||||
import io.wdd.agent.initial.beans.ServerInfo;
|
||||
import io.wdd.common.handler.MyRuntimeException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import javax.security.sasl.SaslServer;
|
||||
import java.net.InetAddress;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
@Configuration
|
||||
@Slf4j
|
||||
public class collectSystemInfo implements ApplicationContextAware {
|
||||
public class CollectSystemInfo implements ApplicationContextAware {
|
||||
|
||||
@Resource
|
||||
Environment environment;
|
||||
|
||||
private ApplicationContext context;
|
||||
|
||||
@Resource
|
||||
InitConfiguration initConfiguration;
|
||||
|
||||
@Bean
|
||||
@Lazy
|
||||
public void initialReadingEnvironment(){
|
||||
|
||||
// https://zhuanlan.zhihu.com/p/449416472
|
||||
@@ -79,12 +87,19 @@ public class collectSystemInfo implements ApplicationContextAware {
|
||||
@PostConstruct
|
||||
private void getInjectServerInfo(){
|
||||
|
||||
log.info("getInjectServerInfo");
|
||||
log.info("Starting getInjectServerInfo");
|
||||
|
||||
ServerInfo serverInfo = (ServerInfo) context.getBean("serverInfo");
|
||||
|
||||
if (ObjectUtils.isEmpty(serverInfo)) {
|
||||
throw new MyRuntimeException(" Collect server info error !");
|
||||
}
|
||||
|
||||
System.out.println("serverInfo = " + serverInfo);
|
||||
log.info("host server info has been collected == {}", serverInfo);
|
||||
|
||||
// start to send message to Octopus Server
|
||||
initConfiguration.SendInfoToServer(serverInfo);
|
||||
log.info("init server info has been send to octopus server !");
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
package io.wdd.agent.initial.bootup;
|
||||
|
||||
import io.wdd.agent.initial.beans.ServerInfo;
|
||||
import io.wdd.agent.initial.rabbitmq.InitialRabbitMqConnector;
|
||||
import org.springframework.amqp.AmqpException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessagePostProcessor;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.annotation.*;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
@Service
|
||||
@Lazy
|
||||
public class InitConfiguration {
|
||||
|
||||
|
||||
@Resource
|
||||
RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Resource
|
||||
InitialRabbitMqConnector initialRabbitMqConnector;
|
||||
|
||||
@Value("${octopus.message.init_ttl}")
|
||||
String defaultInitRegisterTimeOut;
|
||||
|
||||
|
||||
private class InitMessagePostProcessor implements MessagePostProcessor {
|
||||
|
||||
private String initMessageTTL;
|
||||
|
||||
public InitMessagePostProcessor(Long initMessageTTL) {
|
||||
this.initMessageTTL = String.valueOf(initMessageTTL);
|
||||
}
|
||||
|
||||
public InitMessagePostProcessor(String initMessageTTL) {
|
||||
this.initMessageTTL = initMessageTTL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message postProcessMessage(Message message) throws AmqpException {
|
||||
// set init register expiration time
|
||||
MessageProperties messageProperties = message.getMessageProperties();
|
||||
messageProperties.setExpiration(initMessageTTL);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
public void SendInfoToServer(ServerInfo serverInfo){
|
||||
|
||||
// set init agent register ttl
|
||||
InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut);
|
||||
|
||||
rabbitTemplate.convertAndSend("hello wmm !");
|
||||
|
||||
// send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server
|
||||
rabbitTemplate.convertAndSend(initialRabbitMqConnector.INIT_EXCHANGE, initialRabbitMqConnector.INIT_TO_SERVER_KEY, String.valueOf(serverInfo).getBytes(StandardCharsets.UTF_8), initMessagePostProcessor);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* listen to the init queue from octopus server
|
||||
*
|
||||
* @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
|
||||
* @param message
|
||||
* 该方法不需要手动调用,Spring会自动运行这个监听方法
|
||||
*
|
||||
* 注意:如果该监听方法正常结束,那么Spring会自动确认消息
|
||||
* 如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
|
||||
*/
|
||||
@RabbitHandler
|
||||
@RabbitListener(
|
||||
bindings =
|
||||
@QueueBinding(
|
||||
value = @Queue(name = "${octopus.message.init_from_server}" ),
|
||||
exchange = @Exchange(name = "${octopus.message.init_exchange}", type = "direct"),
|
||||
key = {"${octopus.message.init_from_server_key}"}
|
||||
)
|
||||
,
|
||||
ackMode = "MANUAL"
|
||||
)
|
||||
public void ReceiveInitInfoFromServer(String message){
|
||||
|
||||
System.out.println("message = " + message);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,8 +1,60 @@
|
||||
package io.wdd.agent.initial.rabbitmq;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
@Component
|
||||
/**
|
||||
* agent send server init info to octopus server
|
||||
* <p>
|
||||
* agent boot up should send message to server using this queue
|
||||
*/
|
||||
@Configuration
|
||||
public class InitialRabbitMqConnector {
|
||||
|
||||
@Value("${octopus.message.init_exchange}")
|
||||
public String INIT_EXCHANGE;
|
||||
|
||||
@Value("${octopus.message.init_from_server}")
|
||||
public String INIT_FROM_SERVER;
|
||||
|
||||
@Value("${octopus.message.init_to_server}")
|
||||
public String INIT_TO_SERVER;
|
||||
|
||||
@Value("${octopus.message.init_from_server_key}")
|
||||
public String INIT_FROM_SERVER_KEY;
|
||||
|
||||
@Value("${octopus.message.init_to_server_key}")
|
||||
public String INIT_TO_SERVER_KEY;
|
||||
|
||||
@Bean
|
||||
public DirectExchange initDirectExchange() {
|
||||
return new DirectExchange(INIT_EXCHANGE);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue initDirectQueue() {
|
||||
return new Queue(INIT_TO_SERVER);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 配置一个队列和交换机的绑定
|
||||
*
|
||||
* @param initDirectQueue : 需要绑定的队列对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .bind()
|
||||
* @param initDirectExchange : 需要绑定的交换机对象,参数名必须和某个@Bean的方法名完全相同,这样就会进行自动注入,对应 .to()
|
||||
* .with() 方法对应的RoutingKey
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public Binding initBinding(DirectExchange initDirectExchange, Queue initDirectQueue) {
|
||||
return BindingBuilder.bind(initDirectQueue).to(initDirectExchange).with(INIT_TO_SERVER_KEY);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,2 +1,25 @@
|
||||
server:
|
||||
port: 8000
|
||||
port: 8000
|
||||
|
||||
spring:
|
||||
rabbitmq:
|
||||
host: 127.0.0.1
|
||||
port: 35672
|
||||
username: boge
|
||||
password: boge14@Level5
|
||||
virtual-host: /wddserver
|
||||
|
||||
octopus:
|
||||
message:
|
||||
# agent boot up default common exchange
|
||||
init_exchange: InitExchange
|
||||
# server will send message to agent using this common queue
|
||||
init_to_server: InitToServer
|
||||
# agent boot up default common exchange routing key
|
||||
init_to_server_key: InitToServerKey
|
||||
# server will receive message from agent using this common queue
|
||||
init_from_server: InitFromServer
|
||||
# agent boot up default common exchange routing key
|
||||
init_from_server_key: InitFromServerKey
|
||||
# initialization register time out (unit ms) default is 5 min
|
||||
init_ttl: "300000"
|
||||
20
agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java
Normal file
20
agent/src/test/java/io/wdd/agent/InitRabbitMQTest.java
Normal file
@@ -0,0 +1,20 @@
|
||||
package io.wdd.agent;
|
||||
|
||||
import io.wdd.agent.initial.bootup.InitConfiguration;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@SpringBootTest
|
||||
public class InitRabbitMQTest {
|
||||
|
||||
@Resource
|
||||
InitConfiguration initConfiguration;
|
||||
|
||||
@Test
|
||||
void testInitSendInfo(){
|
||||
|
||||
initConfiguration.SendInfoToServer();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user