package rabbitmq import ( "encoding/json" "fmt" "github.com/streadway/amqp" "strings" "sync" "wdd.io/agent-common/logger" ) type RabbitMQ interface { RabbitSendWriter RabbitConnectCloser RabbitQueueHandler } type RabbitSendWriter interface { Send(message []byte) Read(autoAck bool) <-chan amqp.Delivery } type RabbitConnectCloser interface { Connect() Close() error } type RabbitQueueHandler interface { Handle() chan bool } type RabbitQueue struct { // 连接实体 RabbitConn *RabbitMQConn // 连接属性 RabbitProp *ConnectProperty // 底层连接tcp信息 RabbitConnectInfo *RabbitTCPConnectInfo // 返回消息队列 ReceiveChan *RabbitReceiveChan } // RabbitMQConn is a struct that holds the connection and channel objects type RabbitMQConn struct { Connection *amqp.Connection Channel *amqp.Channel } type RabbitReceiveChan struct { AgentRChan chan *OctopusMessage ExecutorRChan chan *OctopusMessage StatusRChan chan *OctopusMessage InitRChan chan *OctopusMessage } type ConnectProperty struct { ExchangeName string QueueName string ExchangeType string TopicKey string } type RabbitTCPConnectInfo struct { UserName string Password string Host string Port string VirtualHost string } var log = logger.Log // 定义全局唯一的 Singleton 实例 var instance *amqp.Connection // 用 sync.Once 变量确保初始化函数只会被调用一次 var once sync.Once // 初始化 Singleton 实例的函数 func createInstance(rabbitConnectInfo *RabbitTCPConnectInfo) { // 在这里进行 Singleton 的初始化操作 // 获取RabbitMQ的连接地址 rabbitMQEndpoint := parseRabbitMQEndpoint(rabbitConnectInfo) // 创建全局唯一连接 RabbitMQ连接 connection, err := amqp.Dial(rabbitMQEndpoint) if err != nil { log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) } instance = connection } // GetInstance 获取全局唯一的 Singleton 实例的函数 func GetInstance(rabbitConnectInfo *RabbitTCPConnectInfo) *amqp.Connection { // 使用 sync.Once 确保 createInstance 只会被调用一次 // todo 理解 once.Do(func() { createInstance(rabbitConnectInfo) }) return instance } // Connect creates a new RabbitMQ connection object func (r *RabbitQueue) Connect() { // 获取RabbitMQ的连接 conn := GetInstance(r.RabbitConnectInfo) ch, err := conn.Channel() if err != nil { log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err)) } if err = ch.ExchangeDeclare( r.RabbitProp.ExchangeName, // name of the exchange r.RabbitProp.ExchangeType, // type of the exchange true, // durable false, // delete when complete false, // internal false, // noWait nil, // arguments ); err != nil { log.Error(fmt.Sprintf("failed to declare exchange !: %w", err)) } _, err = ch.QueueDeclare( r.RabbitProp.QueueName, // name of the queue true, // durable false, // delete when unused false, // exclusive false, // noWait nil, // arguments ) if err != nil { log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err)) } if err = ch.QueueBind( r.RabbitProp.QueueName, // name of the queue r.RabbitProp.TopicKey, // routing key - all topics r.RabbitProp.ExchangeName, // name of the exchange false, // noWait nil, // arguments ); err != nil { log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err)) } // build for receive chan rabbitRCha := &RabbitReceiveChan{} if strings.HasPrefix(r.RabbitProp.QueueName, "Init") { // init queue rabbitRCha.InitRChan = make(chan *OctopusMessage) } else { // business queue rabbitRCha.AgentRChan = make(chan *OctopusMessage, 5) rabbitRCha.ExecutorRChan = make(chan *OctopusMessage, 5) rabbitRCha.StatusRChan = make(chan *OctopusMessage, 5) } // connection r.RabbitConn = &RabbitMQConn{ Connection: conn, Channel: ch, } // receive chan r.ReceiveChan = rabbitRCha } func (r *RabbitQueue) Close() error { var err error if r.RabbitConn.Channel != nil { if err = r.RabbitConn.Channel.Close(); err != nil { log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err)) } } return err } func (r *RabbitQueue) Handle() chan bool { deliveries := r.Read(true) foreverHandle := make(chan bool) // 死循环,处理Octopus Message P.Submit(func() { for delivery := range deliveries { var om *OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) // 保存到某处 continue } // 策略模式 处理消息 P.Submit(func() { om.HandleMsg(r.ReceiveChan) }) } }) return foreverHandle } // Send 向RabbitMQ中发送消息 func (r *RabbitQueue) Send(message []byte) { // 往哪里发 channel := r.RabbitConn.Channel // 发送 err := channel.Publish( r.RabbitProp.ExchangeName, r.RabbitProp.TopicKey, false, false, amqp.Publishing{ ContentType: "text/plain", Body: message, }, ) if err != nil { log.Error(fmt.Sprintf("Failed to publish a message: %v", err)) } } func (r *RabbitQueue) SendOMsg(oMessage *OctopusMessage) { bytes, err := json.Marshal(&oMessage) if err != nil { log.ErrorF("octopus message Marshal error %v", &oMessage) } r.Send(bytes) } func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { // 拿到特定的Channel channel := r.RabbitConn.Channel // 开始读取队列中的全部消息 msgs, err := channel.Consume( r.RabbitProp.QueueName, // 队列名称 "", // 消费者名称 autoAck, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // arguments ) if err != nil { log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) } return msgs } func parseRabbitMQEndpoint(rabbitConnectInfo *RabbitTCPConnectInfo) string { var res strings.Builder // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} res.WriteString("amqp://") res.WriteString(rabbitConnectInfo.UserName) res.WriteString(":") res.WriteString(rabbitConnectInfo.Password) res.WriteString("@") res.WriteString(rabbitConnectInfo.Host) res.WriteString(":") res.WriteString(rabbitConnectInfo.Port) res.WriteString("/") res.WriteString(rabbitConnectInfo.VirtualHost) s := res.String() log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s)) return s }