Files
ProjectOctopus/agent-go/rabbitmq/RabbitMsgQueue.go

302 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rabbitmq
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
"sync/atomic"
"wdd.io/agent-common/logger"
)
// BusinessRuntimeQueue octopus message from server
var BusinessRuntimeQueue = &RabbitQueue{}
// OctopusToServerQueue octopus message to server
var OctopusToServerQueue = &RabbitQueue{}
type RabbitMQ interface {
RabbitSendWriter
RabbitConnectCloser
RabbitQueueHandler
}
type RabbitSendWriter interface {
Send(message []byte)
Read(autoAck bool) <-chan amqp.Delivery
}
type RabbitConnectCloser interface {
Connect()
Close() error
}
type RabbitQueueHandler interface {
Handle() chan bool
}
type RabbitQueue struct {
// 连接实体
RabbitConn *RabbitMQConn
// 连接属性
RabbitProp *ConnectProperty
// 底层连接tcp信息
RabbitConnectInfo *RabbitTCPConnectInfo
// 返回消息队列
ReceiveChan *RabbitReceiveChan
// Consume 是否继续
ConsumeOK atomic.Bool
}
// RabbitMQConn is a struct that holds the connection and channel objects
type RabbitMQConn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
type RabbitReceiveChan struct {
AgentRChan chan *OctopusMessage
ExecutorRChan chan *OctopusMessage
StatusRChan chan *OctopusMessage
InitRChan chan *OctopusMessage
}
type ConnectProperty struct {
ExchangeName string
QueueName string
ExchangeType string
TopicKey string
}
type RabbitTCPConnectInfo struct {
UserName string
Password string
Host string
Port string
VirtualHost string
}
var log = logger.Log
// 定义全局唯一的 Singleton 实例
var instance *amqp.Connection
// 用 sync.Once 变量确保初始化函数只会被调用一次
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance(rabbitConnectInfo *RabbitTCPConnectInfo) {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpoint := parseRabbitMQEndpoint(rabbitConnectInfo)
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpoint)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance(rabbitConnectInfo *RabbitTCPConnectInfo) *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
// todo 理解
once.Do(func() {
createInstance(rabbitConnectInfo)
})
return instance
}
// Connect creates a new RabbitMQ connection object
func (r *RabbitQueue) Connect() {
// 获取RabbitMQ的连接
conn := GetInstance(r.RabbitConnectInfo)
ch, err := conn.Channel()
if err != nil {
log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err))
}
if err = ch.ExchangeDeclare(
r.RabbitProp.ExchangeName, // name of the exchange
r.RabbitProp.ExchangeType, // type of the exchange
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to declare exchange !: %w", err))
}
_, err = ch.QueueDeclare(
r.RabbitProp.QueueName, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err))
}
if err = ch.QueueBind(
r.RabbitProp.QueueName, // name of the queue
r.RabbitProp.TopicKey, // routing key - all topics
r.RabbitProp.ExchangeName, // name of the exchange
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err))
}
// build for receive chan
rabbitRCha := &RabbitReceiveChan{}
if strings.HasPrefix(r.RabbitProp.QueueName, "Init") {
// init queue
rabbitRCha.InitRChan = make(chan *OctopusMessage)
} else {
// business queue
rabbitRCha.AgentRChan = make(chan *OctopusMessage, 5)
rabbitRCha.ExecutorRChan = make(chan *OctopusMessage, 5)
rabbitRCha.StatusRChan = make(chan *OctopusMessage, 5)
}
// connection
r.RabbitConn = &RabbitMQConn{
Connection: conn,
Channel: ch,
}
// receive chan
r.ReceiveChan = rabbitRCha
}
func (r *RabbitQueue) Close() error {
var err error
if r.RabbitConn.Channel != nil {
if err = r.RabbitConn.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
return err
}
func (r *RabbitQueue) Handle() chan bool {
deliveries := r.Read(true)
// 2024年4月9日
r.ConsumeOK.Swap(true)
foreverHandle := make(chan bool)
// 死循环处理Octopus Message
P.Submit(func() {
for delivery := range deliveries {
var om *OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处
continue
}
// 策略模式 处理消息
P.Submit(func() {
om.HandleMsg(r.ReceiveChan)
})
// 2024年4月9日
if !r.ConsumeOK.Load() {
log.InfoF("Rabbit Queue of [%s] [%s[ disconnect!", r.RabbitProp.ExchangeName, r.RabbitProp.QueueName)
break
}
}
})
return foreverHandle
}
// Send 向RabbitMQ中发送消息
func (r *RabbitQueue) Send(message []byte) {
// 往哪里发
channel := r.RabbitConn.Channel
// 发送
err := channel.Publish(
r.RabbitProp.ExchangeName,
r.RabbitProp.TopicKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
log.Error(fmt.Sprintf("Failed to publish a message: %v", err))
}
}
func (r *RabbitQueue) SendOMsg(oMessage *OctopusMessage) {
bytes, err := json.Marshal(&oMessage)
if err != nil {
log.ErrorF("octopus message Marshal error %v", &oMessage)
}
r.Send(bytes)
}
func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
channel := r.RabbitConn.Channel
// 开始读取队列中的全部消息
msgs, err := channel.Consume(
r.RabbitProp.QueueName, // 队列名称
"", // 消费者名称
autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
}
return msgs
}
func parseRabbitMQEndpoint(rabbitConnectInfo *RabbitTCPConnectInfo) string {
var res strings.Builder
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(rabbitConnectInfo.UserName)
res.WriteString(":")
res.WriteString(rabbitConnectInfo.Password)
res.WriteString("@")
res.WriteString(rabbitConnectInfo.Host)
res.WriteString(":")
res.WriteString(rabbitConnectInfo.Port)
res.WriteString("/")
res.WriteString(rabbitConnectInfo.VirtualHost)
s := res.String()
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
return s
}