Files
ProjectOctopus/agent-go/rabbitmq/OctopusMessage.go
2023-06-20 10:02:49 +08:00

160 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rabbitmq
import (
"agent-go/executor"
"agent-go/g"
"agent-go/status"
"agent-go/utils"
"encoding/json"
"fmt"
"strings"
)
var P = g.G.P
type IOctopusMessage interface {
OctopusMsgHandler
OctopusMsgSender
OctopusMsgBuilder
}
type OctopusMsgHandler interface {
Handle(octopusMessage *OctopusMessage)
}
type OctopusMsgSender interface {
Send(rabbitQueue *RabbitQueue, msg []byte)
}
type OctopusMsgBuilder interface {
Build(omType string, content interface{}) *OctopusMessage
}
type OctopusMessage struct {
UUID string `json:"uuid"`
InitTime string `json:"init_time" format:"2023-03-21 16:38:30"`
Type string `json:"type"`
Content interface{} `json:"content"`
Result interface{} `json:"result"`
ACTime string `json:"ac_time" format:"2023-03-21 16:38:30"`
}
func (om *OctopusMessage) Handle() {
// 实际执行 OM handle进程
doHandleOctopusMessage(om)
}
func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) {
rabbitQueue.Send(msg)
}
func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage {
// 当前时间
curTimeString := utils.ParseDateTimeTime()
// must write to string format, otherwise it's very hard to deserialize
bytes, err := json.Marshal(content)
if err != nil {
fmt.Sprintf("OctopusMessage Build Error ! %v", err)
}
return &OctopusMessage{
UUID: curTimeString,
InitTime: curTimeString,
Type: omType,
Content: string(bytes),
Result: nil,
ACTime: curTimeString,
}
}
func doHandleOctopusMessage(octopusMessage *OctopusMessage) {
switch octopusMessage.Type {
case g.InitOmType:
go func() {}()
case g.ExecOmType:
P.Submit(func() {
executorOMHandler(octopusMessage)
})
case g.StatusOmType:
P.Submit(func() {
statusOMHandler(octopusMessage)
})
case g.AgentOmType:
P.Submit(func() {
agentOMHandler(octopusMessage)
},
)
default:
P.Submit(func() {
blackHoleOMHandler(octopusMessage)
})
}
}
// agentOMHandler 处理Agent的核心操作指令
func agentOMHandler(octopusMessage *OctopusMessage) {
}
func executorOMHandler(octopusMessage *OctopusMessage) {
executionMsgString := octopusMessage.Content.(string)
var executionMessage *executor.ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil {
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
return
}
// 交给后端的实际处理器处理, 再次策略
executor.Execute(executionMessage)
}
func statusOMHandler(octopusMessage *OctopusMessage) {
v, ok := (octopusMessage.Content).(string)
if !ok {
log.ErrorF("convert to string is wrong %s", v)
}
statusMsgString := octopusMessage.Content.(string)
var statusMessage *status.StatusMessage
err := json.Unmarshal([]byte(statusMsgString), &statusMessage)
if err != nil {
fmt.Println(err.Error())
log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage))
return
}
var statusRes string
if strings.HasPrefix(statusMessage.StatusType, "P") {
// ping info
statusRes = status.Ping()
} else {
// status info
agentStatusString, _ := json.Marshal(status.ReportAppStatus())
statusRes = string(agentStatusString)
}
// 返回消息
// 组装消息
octopusMessage.ACTime = utils.ParseDateTimeTime()
octopusMessage.Result = statusRes
// 发送回去
statusOctopusReplayMessage, _ := json.Marshal(octopusMessage)
OctopusToServerQueue.Send(statusOctopusReplayMessage)
// 输出日志
log.InfoF("接收到查询Agent状态的请求结果为 => %s", statusRes)
}
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage))
}