package rabbitmq import ( "agent-go/executor" "agent-go/g" "agent-go/utils" "encoding/json" "fmt" "time" ) var P = g.G.P type IOctopusMessage interface { OctopusMsgHandler OctopusMsgSender OctopusMsgBuilder } type OctopusMsgHandler interface { Handle(octopusMessage *OctopusMessage) } type OctopusMsgSender interface { Send(rabbitQueue *RabbitQueue, msg []byte) } type OctopusMsgBuilder interface { Build(omType string, content interface{}) *OctopusMessage } type OctopusMessage struct { UUID string `json:"uuid"` InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"` Type string `json:"type"` Content interface{} `json:"content"` Result interface{} `json:"result"` ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"` } func (om *OctopusMessage) Handle() { // 实际执行 OM handle进程 doHandleOctopusMessage(om) } func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { rabbitQueue.Send(msg) } func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage { // 当前时间 curTimeString := utils.CurTimeString() // must write to string format, otherwise it's very hard to deserialize bytes, err := json.Marshal(content) if err != nil { fmt.Sprintf("OctopusMessage Build Error ! %v", err) } return &OctopusMessage{ UUID: curTimeString, InitTime: time.Now(), Type: omType, Content: string(bytes), Result: nil, ACTime: time.Time{}, } } func doHandleOctopusMessage(octopusMessage *OctopusMessage) { switch octopusMessage.Type { case g.InitOmType: go func() {}() case g.ExecOmType: P.Submit(func() { executorOMHandler(octopusMessage) }) case g.StatusOmType: P.Submit(func() { statusOMHandler(octopusMessage) }) default: P.Submit(func() { blackHoleOMHandler(octopusMessage) }) } } func executorOMHandler(octopusMessage *OctopusMessage) { executionMsgString := octopusMessage.Content.(string) var executionMessage *executor.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) return } // 交给后端的实际处理器处理, 再次策略 executor.Execute(executionMessage) } func statusOMHandler(octopusMessage *OctopusMessage) { } func blackHoleOMHandler(octopusMessage *OctopusMessage) { log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) }