package rabbitmq import ( "agent-go/executor" "agent-go/g" "agent-go/status" "agent-go/utils" "encoding/json" "fmt" "strings" ) type IOctopusMessage interface { OctopusMsgHandler OctopusMsgSender OctopusMsgBuilder } type OctopusMsgHandler interface { Handle(octopusMessage *OctopusMessage) } type OctopusMsgSender interface { Send(rabbitQueue *RabbitQueue, msg []byte) SendToOctopusServer() } type OctopusMsgBuilder interface { Build(omType string, content interface{}) *OctopusMessage } type OctopusMessage struct { UUID string `json:"uuid"` InitTime string `json:"init_time" format:"2023-03-21 16:38:30"` OctopusMessageType string `json:"octopusMessageType"` Content interface{} `json:"content"` ACTime string `json:"ac_time" format:"2023-03-21 16:38:30"` Result interface{} `json:"result"` ResultCode string `json:"resultCode"` } func (om *OctopusMessage) Handle() { // 实际执行 OM handle进程 doHandleOctopusMessage(om) } func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { rabbitQueue.Send(msg) } // SendToOctopusServer send octopus message back to octopusToServer queue func (om *OctopusMessage) SendToOctopusServer() { // write the octopus message to bytes octopusMessageReplayBytes, err := json.Marshal(om) if err != nil { log.ErrorF("replay octopus message write error => %v", err) } // Send back the result to queue OctopusToServerQueue.Send(octopusMessageReplayBytes) } func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage { // 当前时间 curTimeString := utils.ParseDateTimeTime() // must write to string format, otherwise it's very hard to deserialize bytes, err := json.Marshal(content) if err != nil { fmt.Sprintf("OctopusMessage Build Error ! %v", err) } return &OctopusMessage{ UUID: curTimeString, InitTime: curTimeString, OctopusMessageType: omType, Content: string(bytes), Result: nil, ACTime: curTimeString, } } func doHandleOctopusMessage(octopusMessage *OctopusMessage) { switch octopusMessage.OctopusMessageType { case g.InitOmType: go func() {}() case g.ExecOmType: P.Submit(func() { executorOMHandler(octopusMessage) }) case g.StatusOmType: P.Submit(func() { statusOMHandler(octopusMessage) }) case g.AgentOmType: P.Submit(func() { agentOMHandler(octopusMessage) }, ) default: P.Submit(func() { blackHoleOMHandler(octopusMessage) }) } } // agentOMHandler 处理Agent的核心操作指令 func agentOMHandler(octopusMessage *OctopusMessage) { } func executorOMHandler(octopusMessage *OctopusMessage) { executionMsgString := octopusMessage.Content.(string) var executionMessage *executor.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) return } // 交给后端的实际处理器处理, 再次策略 resultLog, err := executor.Execute(executionMessage) if err != nil { octopusMessage.ResultCode = "200" } else { octopusMessage.ResultCode = "300" } // send back the result log octopusMessage.Result = resultLog octopusMessage.ACTime = utils.ParseISOLocalDateTime() // Send octopusMessage.SendToOctopusServer() } func statusOMHandler(octopusMessage *OctopusMessage) { v, ok := (octopusMessage.Content).(string) if !ok { log.ErrorF("convert to string is wrong %s", v) } statusMsgString := octopusMessage.Content.(string) var statusMessage *status.StatusMessage err := json.Unmarshal([]byte(statusMsgString), &statusMessage) if err != nil { fmt.Println(err.Error()) log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) return } var statusRes string if strings.HasPrefix(statusMessage.StatusType, "P") { // ping info statusRes = status.Ping() } else { // metric info agentStatusString, _ := json.Marshal(status.ReportAppStatus()) statusRes = string(agentStatusString) } // 返回消息 // 组装消息 octopusMessage.ACTime = utils.ParseDateTimeTime() octopusMessage.Result = statusRes // 发送回去 statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) OctopusToServerQueue.Send(statusOctopusReplayMessage) // 输出日志 log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) } func blackHoleOMHandler(octopusMessage *OctopusMessage) { log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage)) }