package rabbitmq import ( "agent-go/executor" "agent-go/g" "agent-go/status" "agent-go/utils" "encoding/json" "fmt" "strings" ) type IOctopusMessage interface { OctopusMsgHandler OctopusMsgSender OctopusMsgBuilder } type OctopusMsgHandler interface { Handle(octopusMessage *OctopusMessage) } type OctopusMsgSender interface { Send(rabbitQueue *RabbitQueue, msg []byte) SendToOctopusServer() } type OctopusMsgBuilder interface { Build(omType string, content interface{}) *OctopusMessage } type OctopusMessage struct { UUID string `json:"uuid"` InitTime string `json:"init_time" format:"2023-03-21 16:38:30"` OctopusMessageType string `json:"octopusMessageType"` Content interface{} `json:"content"` ACTime string `json:"ac_time" format:"2023-03-21 16:38:30"` Result interface{} `json:"result"` ResultCode string `json:"resultCode"` } func (om *OctopusMessage) Handle() { // 实际执行 OM handle进程 log.Debug("接收到OctopusMessage,开始处理!") doHandleOctopusMessage(om) } func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { rabbitQueue.Send(msg) } // SendToOctopusServer send octopus message back to octopusToServer queue func (om *OctopusMessage) SendToOctopusServer() { // write the octopus message to bytes octopusMessageReplayBytes, err := json.Marshal(om) if err != nil { log.ErrorF("replay octopus message write error => %v", err) } // Send back the result to queue OctopusToServerQueue.Send(octopusMessageReplayBytes) } func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage { // 当前时间 curTimeString := utils.ParseDateTimeTime() // must write to string format, otherwise it's very hard to deserialize bytes, err := json.Marshal(content) if err != nil { fmt.Sprintf("OctopusMessage Build Error ! %v", err) } return &OctopusMessage{ UUID: curTimeString, InitTime: curTimeString, OctopusMessageType: omType, Content: string(bytes), Result: nil, ACTime: curTimeString, } } func doHandleOctopusMessage(octopusMessage *OctopusMessage) { switch octopusMessage.OctopusMessageType { case g.InitOmType: go func() {}() case g.ExecOmType: P.Submit(func() { executorOMHandler(octopusMessage) }) case g.StatusOmType: P.Submit(func() { statusOMHandler(octopusMessage) }) case g.AgentOmType: P.Submit(func() { agentOMHandler(octopusMessage) }, ) default: P.Submit(func() { blackHoleOMHandler(octopusMessage) }) } } // agentOMHandler 处理Agent的核心操作指令 func agentOMHandler(octopusMessage *OctopusMessage) { } func executorOMHandler(octopusMessage *OctopusMessage) { log.Debug("开始处理 Executor Octopus Message !") // 转换类型 executionMsgString := octopusMessage.Content.(string) // 解析 ExecutionMessage var executionMessage *executor.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) return } // 执行命令 ok, resultLog := executor.Execute(executionMessage) if ok { octopusMessage.ResultCode = "200" } else { octopusMessage.ResultCode = "300" } // 返回结果 if executionMessage.NeedResultReplay { // send back the result log octopusMessage.Result = resultLog } // 返回时间 octopusMessage.ACTime = utils.ParseDateTimeTime() // 返回结果 octopusMessage.SendToOctopusServer() } func statusOMHandler(octopusMessage *OctopusMessage) { v, ok := (octopusMessage.Content).(string) if !ok { log.ErrorF("convert to string is wrong %s", v) } statusMsgString := octopusMessage.Content.(string) var statusMessage *status.StatusMessage err := json.Unmarshal([]byte(statusMsgString), &statusMessage) if err != nil { fmt.Println(err.Error()) log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) return } var statusRes string if strings.HasPrefix(statusMessage.StatusType, "P") { // ping info statusRes = status.Ping() } else { // metric info agentStatusString, _ := json.Marshal(status.ReportAppStatus()) statusRes = string(agentStatusString) } // 返回消息 // 组装消息 octopusMessage.ACTime = utils.ParseDateTimeTime() octopusMessage.Result = statusRes // 发送回去 statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) OctopusToServerQueue.Send(statusOctopusReplayMessage) // 输出日志 log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) } func blackHoleOMHandler(octopusMessage *OctopusMessage) { log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage)) }