215 lines
5.0 KiB
Go
215 lines
5.0 KiB
Go
package rabbitmq
|
|
|
|
import (
|
|
"agent-go/g"
|
|
logger2 "agent-go/logger"
|
|
"fmt"
|
|
"github.com/streadway/amqp"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
type RabbitMQ interface {
|
|
RabbitSendWriter
|
|
|
|
RabbitConnectCloser
|
|
}
|
|
|
|
type RabbitSendWriter interface {
|
|
Send(message []byte)
|
|
|
|
Read(autoAck bool) <-chan amqp.Delivery
|
|
}
|
|
|
|
type RabbitConnectCloser interface {
|
|
Connect()
|
|
|
|
Close() error
|
|
}
|
|
|
|
type RabbitQueue struct {
|
|
RabbitConn *RabbitMQConn
|
|
RabbitProp *ConnectProperty
|
|
}
|
|
|
|
// RabbitMQConn is a struct that holds the connection and channel objects
|
|
type RabbitMQConn struct {
|
|
Connection *amqp.Connection
|
|
Channel *amqp.Channel
|
|
}
|
|
|
|
type ConnectProperty struct {
|
|
ExchangeName string
|
|
QueueName string
|
|
ExchangeType string
|
|
TopicKey string
|
|
}
|
|
|
|
var log = logger2.Log
|
|
|
|
// 定义全局唯一的 Singleton 实例
|
|
var instance *amqp.Connection
|
|
|
|
// 用 sync.Once 变量确保初始化函数只会被调用一次
|
|
var once sync.Once
|
|
|
|
// 初始化 Singleton 实例的函数
|
|
func createInstance() {
|
|
// 在这里进行 Singleton 的初始化操作
|
|
|
|
// 获取RabbitMQ的连接地址
|
|
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
|
|
|
|
// 创建全局唯一连接 RabbitMQ连接
|
|
connection, err := amqp.Dial(rabbitMQEndpointFromG)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
|
|
}
|
|
|
|
instance = connection
|
|
}
|
|
|
|
// GetInstance 获取全局唯一的 Singleton 实例的函数
|
|
func GetInstance() *amqp.Connection {
|
|
// 使用 sync.Once 确保 createInstance 只会被调用一次
|
|
once.Do(createInstance)
|
|
return instance
|
|
}
|
|
|
|
// Connect creates a new RabbitMQ connection object
|
|
func (r *RabbitQueue) Connect() {
|
|
|
|
// 获取RabbitMQ的连接
|
|
conn := GetInstance()
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err))
|
|
}
|
|
|
|
if err = ch.ExchangeDeclare(
|
|
r.RabbitProp.ExchangeName, // name of the exchange
|
|
r.RabbitProp.ExchangeType, // type of the exchange
|
|
true, // durable
|
|
false, // delete when complete
|
|
false, // internal
|
|
false, // noWait
|
|
nil, // arguments
|
|
); err != nil {
|
|
log.Error(fmt.Sprintf("failed to declare exchange !: %w", err))
|
|
}
|
|
|
|
_, err = ch.QueueDeclare(
|
|
r.RabbitProp.QueueName, // name of the queue
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // noWait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err))
|
|
}
|
|
|
|
if err = ch.QueueBind(
|
|
r.RabbitProp.QueueName, // name of the queue
|
|
r.RabbitProp.TopicKey, // routing key - all topics
|
|
r.RabbitProp.ExchangeName, // name of the exchange
|
|
false, // noWait
|
|
nil, // arguments
|
|
); err != nil {
|
|
log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err))
|
|
}
|
|
|
|
r.RabbitConn = &RabbitMQConn{
|
|
Connection: conn,
|
|
Channel: ch,
|
|
}
|
|
}
|
|
|
|
func (r *RabbitQueue) Close() error {
|
|
var err error
|
|
|
|
if r.RabbitConn.Channel != nil {
|
|
if err = r.RabbitConn.Channel.Close(); err != nil {
|
|
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Send 向RabbitMQ中发送消息
|
|
func (r *RabbitQueue) Send(message []byte) {
|
|
// 往哪里发
|
|
channel := r.RabbitConn.Channel
|
|
|
|
// 发送
|
|
err := channel.Publish(
|
|
r.RabbitProp.ExchangeName,
|
|
r.RabbitProp.TopicKey,
|
|
false,
|
|
false,
|
|
amqp.Publishing{
|
|
ContentType: "text/plain",
|
|
Body: message,
|
|
},
|
|
)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("Failed to publish a message: %v", err))
|
|
}
|
|
}
|
|
|
|
func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
|
|
|
|
// 拿到特定的Channel
|
|
channel := r.RabbitConn.Channel
|
|
|
|
// 开始读取队列中的全部消息
|
|
msgs, err := channel.Consume(
|
|
r.RabbitProp.QueueName, // 队列名称
|
|
"", // 消费者名称
|
|
autoAck, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
|
|
}
|
|
|
|
return msgs
|
|
}
|
|
|
|
// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址
|
|
func parseRabbitMQEndpointFromG() string {
|
|
|
|
nacosConfig := g.G.NacosConfig
|
|
|
|
var res strings.Builder
|
|
|
|
host := nacosConfig.GetString("spring.rabbitmq.host")
|
|
port := nacosConfig.GetString("spring.rabbitmq.port")
|
|
username := nacosConfig.GetString("spring.rabbitmq.username")
|
|
password := nacosConfig.GetString("spring.rabbitmq.password")
|
|
virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host")
|
|
|
|
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
|
|
res.WriteString("amqp://")
|
|
res.WriteString(username)
|
|
res.WriteString(":")
|
|
res.WriteString(password)
|
|
res.WriteString("@")
|
|
res.WriteString(host)
|
|
res.WriteString(":")
|
|
res.WriteString(port)
|
|
res.WriteString("/")
|
|
res.WriteString(virtualHost)
|
|
|
|
s := res.String()
|
|
|
|
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
|
|
|
|
return s
|
|
}
|