Files
ProjectOctopus/agent-go/rabbitmq/RabbitMsgQueue.go
2023-06-16 14:22:58 +08:00

215 lines
5.0 KiB
Go

package rabbitmq
import (
"agent-go/g"
logger2 "agent-go/logger"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
)
type RabbitMQ interface {
RabbitSendWriter
RabbitConnectCloser
}
type RabbitSendWriter interface {
Send(message []byte)
Read(autoAck bool) <-chan amqp.Delivery
}
type RabbitConnectCloser interface {
Connect()
Close() error
}
type RabbitQueue struct {
RabbitConn *RabbitMQConn
RabbitProp *ConnectProperty
}
// RabbitMQConn is a struct that holds the connection and channel objects
type RabbitMQConn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
type ConnectProperty struct {
ExchangeName string
QueueName string
ExchangeType string
TopicKey string
}
var log = logger2.Log
// 定义全局唯一的 Singleton 实例
var instance *amqp.Connection
// 用 sync.Once 变量确保初始化函数只会被调用一次
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance() {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpointFromG)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance() *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
once.Do(createInstance)
return instance
}
// Connect creates a new RabbitMQ connection object
func (r *RabbitQueue) Connect() {
// 获取RabbitMQ的连接
conn := GetInstance()
ch, err := conn.Channel()
if err != nil {
log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err))
}
if err = ch.ExchangeDeclare(
r.RabbitProp.ExchangeName, // name of the exchange
r.RabbitProp.ExchangeType, // type of the exchange
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to declare exchange !: %w", err))
}
_, err = ch.QueueDeclare(
r.RabbitProp.QueueName, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err))
}
if err = ch.QueueBind(
r.RabbitProp.QueueName, // name of the queue
r.RabbitProp.TopicKey, // routing key - all topics
r.RabbitProp.ExchangeName, // name of the exchange
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err))
}
r.RabbitConn = &RabbitMQConn{
Connection: conn,
Channel: ch,
}
}
func (r *RabbitQueue) Close() error {
var err error
if r.RabbitConn.Channel != nil {
if err = r.RabbitConn.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
return err
}
// Send 向RabbitMQ中发送消息
func (r *RabbitQueue) Send(message []byte) {
// 往哪里发
channel := r.RabbitConn.Channel
// 发送
err := channel.Publish(
r.RabbitProp.ExchangeName,
r.RabbitProp.TopicKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
log.Error(fmt.Sprintf("Failed to publish a message: %v", err))
}
}
func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
channel := r.RabbitConn.Channel
// 开始读取队列中的全部消息
msgs, err := channel.Consume(
r.RabbitProp.QueueName, // 队列名称
"", // 消费者名称
autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
}
return msgs
}
// parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string {
agentConfig := g.G.AgentConfig
var res strings.Builder
host := agentConfig.GetString("spring.rabbitmq.host")
port := agentConfig.GetString("spring.rabbitmq.port")
username := agentConfig.GetString("spring.rabbitmq.username")
password := agentConfig.GetString("spring.rabbitmq.password")
virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(username)
res.WriteString(":")
res.WriteString(password)
res.WriteString("@")
res.WriteString(host)
res.WriteString(":")
res.WriteString(port)
res.WriteString("/")
res.WriteString(virtualHost)
s := res.String()
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
return s
}