78 lines
1.8 KiB
Go
78 lines
1.8 KiB
Go
package rabbitmq
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"wdd.io/agent-go/g"
|
||
)
|
||
|
||
var OctopusToServerQueue = &RabbitQueue{}
|
||
|
||
var P = g.G.P
|
||
|
||
func BuildOMsgRuntimeConnectorQueue(agentTopicName string) chan bool {
|
||
|
||
// 建立 业务消息 接收队列
|
||
// agentTopicName为名称的队列
|
||
agentConfig := g.G.AgentConfig
|
||
|
||
octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
|
||
|
||
octopusConnectProp := &ConnectProperty{
|
||
ExchangeName: octopusExchangeName,
|
||
QueueName: agentTopicName,
|
||
ExchangeType: g.QueueTopic,
|
||
TopicKey: agentTopicName + "*",
|
||
}
|
||
|
||
octopusMsgQueue := &RabbitQueue{
|
||
RabbitProp: octopusConnectProp,
|
||
}
|
||
octopusMsgQueue.Connect()
|
||
|
||
// 建立 业务消息 返回队列
|
||
// 统一为 OctopusToServer
|
||
octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
|
||
|
||
octopusToServerProp := &ConnectProperty{
|
||
ExchangeName: octopusExchangeName,
|
||
QueueName: octopusToServerQueueName,
|
||
ExchangeType: g.QueueTopic,
|
||
TopicKey: octopusToServerQueueName,
|
||
}
|
||
|
||
OctopusToServerQueue = &RabbitQueue{
|
||
RabbitProp: octopusToServerProp,
|
||
}
|
||
|
||
// 开启运行时消息返回队列
|
||
OctopusToServerQueue.Connect()
|
||
|
||
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue)
|
||
|
||
deliveries := octopusMsgQueue.Read(true)
|
||
businessForeverChan := make(chan bool)
|
||
P.Submit(
|
||
func() {
|
||
// 死循环,处理Octopus Message
|
||
for delivery := range deliveries {
|
||
|
||
var om *OctopusMessage
|
||
err := json.Unmarshal(delivery.Body, &om)
|
||
if err != nil {
|
||
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
|
||
// 保存到某处
|
||
continue
|
||
}
|
||
|
||
// 策略模式 处理消息
|
||
P.Submit(func() {
|
||
om.Handle()
|
||
})
|
||
}
|
||
})
|
||
|
||
return businessForeverChan
|
||
|
||
}
|