package rabbitmq import ( "agent-go/config" "agent-go/executor" "agent-go/g" "encoding/json" "fmt" ) func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { agentTopicName := initOMsgFromServer.UUID OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") octopusConnectProp := &ConnectProperty{ ExchangeName: OctopusExchange, QueueName: agentTopicName, ExchangeType: g.QueueTopic, TopicKey: agentTopicName + "*", } octopusConn, err := NewRabbitMQConn(octopusConnectProp) if err != nil { log.Error(fmt.Sprintf("Octopus Message Queue create Error ! => %v", octopusConnectProp)) panic(err) } // 开始接收消息 channel := octopusConn.Channel deliveries, err := channel.Consume( agentTopicName, agentTopicName, true, false, false, false, nil, ) if err != nil { return } // 死循环,处理Ocotpus Message for delivery := range deliveries { var om *config.OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error("Octopus Message Parse Error !") // 保存到某处 continue } // 策略模式 处理消息 doHandleOctopusMessage(om) } } func doHandleOctopusMessage(octopusMessage *config.OctopusMessage) { switch octopusMessage.Type { case g.InitOmType: go func() {}() case g.ExecOmType: go executorOMHandler(octopusMessage) case g.StatusOmType: go statusOMHandler(octopusMessage) default: go blackHoleOMHandler(octopusMessage) } } func executorOMHandler(octopusMessage *config.OctopusMessage) { executionMsgString := octopusMessage.Content.(string) var executionMessage *config.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { return } // 交给后端的实际处理器处理, 再次策略 executor.Execute(octopusMessage, executionMessage) } func statusOMHandler(octopusMessage *config.OctopusMessage) { } func blackHoleOMHandler(octopusMessage *config.OctopusMessage) { log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) }