package rabbitmq import ( "fmt" "github.com/streadway/amqp" "strings" "sync" "wdd.io/agent-common/logger" "wdd.io/agent-go/g" ) type RabbitMQ interface { RabbitSendWriter RabbitConnectCloser } type RabbitSendWriter interface { Send(message []byte) Read(autoAck bool) <-chan amqp.Delivery } type RabbitConnectCloser interface { Connect() Close() error } type RabbitQueue struct { RabbitConn *RabbitMQConn RabbitProp *ConnectProperty } // RabbitMQConn is a struct that holds the connection and channel objects type RabbitMQConn struct { Connection *amqp.Connection Channel *amqp.Channel } type ConnectProperty struct { ExchangeName string QueueName string ExchangeType string TopicKey string } var log = logger.Log // 定义全局唯一的 Singleton 实例 var instance *amqp.Connection // 用 sync.Once 变量确保初始化函数只会被调用一次 var once sync.Once // 初始化 Singleton 实例的函数 func createInstance() { // 在这里进行 Singleton 的初始化操作 // 获取RabbitMQ的连接地址 rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() // 创建全局唯一连接 RabbitMQ连接 connection, err := amqp.Dial(rabbitMQEndpointFromG) if err != nil { log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) } instance = connection } // GetInstance 获取全局唯一的 Singleton 实例的函数 func GetInstance() *amqp.Connection { // 使用 sync.Once 确保 createInstance 只会被调用一次 once.Do(createInstance) return instance } // Connect creates a new RabbitMQ connection object func (r *RabbitQueue) Connect() { // 获取RabbitMQ的连接 conn := GetInstance() ch, err := conn.Channel() if err != nil { log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err)) } if err = ch.ExchangeDeclare( r.RabbitProp.ExchangeName, // name of the exchange r.RabbitProp.ExchangeType, // type of the exchange true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ); err != nil { log.Error(fmt.Sprintf("failed to declare exchange !: %w", err)) } _, err = ch.QueueDeclare( r.RabbitProp.QueueName, // name of the queue true, // durable false, // delete when unused false, // exclusive false, // noWait nil, // arguments ) if err != nil { log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err)) } if err = ch.QueueBind( r.RabbitProp.QueueName, // name of the queue r.RabbitProp.TopicKey, // routing key - all topics r.RabbitProp.ExchangeName, // name of the exchange false, // noWait nil, // arguments ); err != nil { log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err)) } r.RabbitConn = &RabbitMQConn{ Connection: conn, Channel: ch, } } func (r *RabbitQueue) Close() error { var err error if r.RabbitConn.Channel != nil { if err = r.RabbitConn.Channel.Close(); err != nil { log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err)) } } return err } // Send 向RabbitMQ中发送消息 func (r *RabbitQueue) Send(message []byte) { // 往哪里发 channel := r.RabbitConn.Channel // 发送 err := channel.Publish( r.RabbitProp.ExchangeName, r.RabbitProp.TopicKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: message, }, ) if err != nil { log.Error(fmt.Sprintf("Failed to publish a message: %v", err)) } } func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { // 拿到特定的Channel channel := r.RabbitConn.Channel // 开始读取队列中的全部消息 msgs, err := channel.Consume( r.RabbitProp.QueueName, // 队列名称 "", // 消费者名称 autoAck, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // arguments ) if err != nil { log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) } return msgs } // parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址 func parseRabbitMQEndpointFromG() string { agentConfig := g.G.AgentConfig var res strings.Builder host := agentConfig.GetString("spring.rabbitmq.host") port := agentConfig.GetString("spring.rabbitmq.port") username := agentConfig.GetString("spring.rabbitmq.username") password := agentConfig.GetString("spring.rabbitmq.password") virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} res.WriteString("amqp://") res.WriteString(username) res.WriteString(":") res.WriteString(password) res.WriteString("@") res.WriteString(host) res.WriteString(":") res.WriteString(port) res.WriteString("/") res.WriteString(virtualHost) s := res.String() log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s)) return s }