200 lines
5.0 KiB
Go
200 lines
5.0 KiB
Go
package rabbitmq
|
||
|
||
import (
|
||
"agent-go/executor"
|
||
"agent-go/g"
|
||
"agent-go/status"
|
||
"agent-go/utils"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
)
|
||
|
||
type IOctopusMessage interface {
|
||
OctopusMsgHandler
|
||
OctopusMsgSender
|
||
OctopusMsgBuilder
|
||
}
|
||
|
||
type OctopusMsgHandler interface {
|
||
Handle(octopusMessage *OctopusMessage)
|
||
}
|
||
|
||
type OctopusMsgSender interface {
|
||
Send(rabbitQueue *RabbitQueue, msg []byte)
|
||
|
||
SendToOctopusServer()
|
||
}
|
||
|
||
type OctopusMsgBuilder interface {
|
||
Build(omType string, content interface{}) *OctopusMessage
|
||
}
|
||
|
||
type OctopusMessage struct {
|
||
UUID string `json:"uuid"`
|
||
InitTime string `json:"init_time" format:"2023-03-21 16:38:30"`
|
||
OctopusMessageType string `json:"octopusMessageType"`
|
||
Content interface{} `json:"content"`
|
||
ACTime string `json:"ac_time" format:"2023-03-21 16:38:30"`
|
||
Result interface{} `json:"result"`
|
||
ResultCode string `json:"resultCode"`
|
||
}
|
||
|
||
func (om *OctopusMessage) Handle() {
|
||
// 实际执行 OM handle进程
|
||
log.Debug("接收到OctopusMessage,开始处理!")
|
||
doHandleOctopusMessage(om)
|
||
}
|
||
|
||
func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) {
|
||
rabbitQueue.Send(msg)
|
||
}
|
||
|
||
// SendToOctopusServer send octopus message back to octopusToServer queue
|
||
func (om *OctopusMessage) SendToOctopusServer() {
|
||
|
||
// write the octopus message to bytes
|
||
octopusMessageReplayBytes, err := json.Marshal(om)
|
||
if err != nil {
|
||
log.ErrorF("replay octopus message write error => %v", err)
|
||
}
|
||
|
||
// Send back the result to queue
|
||
OctopusToServerQueue.Send(octopusMessageReplayBytes)
|
||
|
||
}
|
||
|
||
func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage {
|
||
|
||
// 当前时间
|
||
curTimeString := utils.ParseDateTimeTime()
|
||
|
||
// must write to string format, otherwise it's very hard to deserialize
|
||
bytes, err := json.Marshal(content)
|
||
if err != nil {
|
||
fmt.Sprintf("OctopusMessage Build Error ! %v", err)
|
||
}
|
||
|
||
return &OctopusMessage{
|
||
UUID: curTimeString,
|
||
InitTime: curTimeString,
|
||
OctopusMessageType: omType,
|
||
Content: string(bytes),
|
||
Result: nil,
|
||
ACTime: curTimeString,
|
||
}
|
||
}
|
||
|
||
func doHandleOctopusMessage(octopusMessage *OctopusMessage) {
|
||
|
||
switch octopusMessage.OctopusMessageType {
|
||
case g.InitOmType:
|
||
go func() {}()
|
||
case g.ExecOmType:
|
||
P.Submit(func() {
|
||
executorOMHandler(octopusMessage)
|
||
})
|
||
case g.StatusOmType:
|
||
P.Submit(func() {
|
||
statusOMHandler(octopusMessage)
|
||
})
|
||
case g.AgentOmType:
|
||
P.Submit(func() {
|
||
agentOMHandler(octopusMessage)
|
||
},
|
||
)
|
||
default:
|
||
P.Submit(func() {
|
||
blackHoleOMHandler(octopusMessage)
|
||
})
|
||
}
|
||
|
||
}
|
||
|
||
// agentOMHandler 处理Agent的核心操作指令
|
||
func agentOMHandler(octopusMessage *OctopusMessage) {
|
||
|
||
}
|
||
|
||
func executorOMHandler(octopusMessage *OctopusMessage) {
|
||
log.Debug("开始处理 Executor Octopus Message !")
|
||
|
||
// 转换类型
|
||
executionMsgString := octopusMessage.Content.(string)
|
||
|
||
// 解析 ExecutionMessage
|
||
var executionMessage *executor.ExecutionMessage
|
||
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
|
||
if err != nil {
|
||
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
|
||
return
|
||
}
|
||
|
||
// 执行命令
|
||
ok, resultLog := executor.Execute(executionMessage)
|
||
if ok {
|
||
octopusMessage.ResultCode = "200"
|
||
} else {
|
||
octopusMessage.ResultCode = "300"
|
||
}
|
||
|
||
// 返回结果
|
||
if executionMessage.NeedResultReplay {
|
||
// send back the result log
|
||
octopusMessage.Result = resultLog
|
||
}
|
||
// 返回时间
|
||
octopusMessage.ACTime = utils.ParseDateTimeTime()
|
||
|
||
// 返回结果
|
||
octopusMessage.SendToOctopusServer()
|
||
}
|
||
|
||
func statusOMHandler(octopusMessage *OctopusMessage) {
|
||
|
||
v, ok := (octopusMessage.Content).(string)
|
||
if !ok {
|
||
log.ErrorF("convert to string is wrong %s", v)
|
||
}
|
||
|
||
statusMsgString := octopusMessage.Content.(string)
|
||
|
||
var statusMessage *status.StatusMessage
|
||
err := json.Unmarshal([]byte(statusMsgString), &statusMessage)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage))
|
||
return
|
||
}
|
||
|
||
// OMessageStatusTypeEnum
|
||
var statusRes string
|
||
if strings.HasPrefix(statusMessage.StatusType, "PING") {
|
||
// ping info
|
||
statusRes = status.Ping()
|
||
} else if strings.HasPrefix(statusMessage.StatusType, "METRIC") {
|
||
// metric info
|
||
agentStatusString, _ := json.Marshal(status.ReportAppStatus())
|
||
statusRes = string(agentStatusString)
|
||
} else if strings.HasPrefix(statusMessage.StatusType, "INFO") {
|
||
|
||
} else {
|
||
log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType)
|
||
}
|
||
|
||
// 返回消息
|
||
// 组装消息
|
||
octopusMessage.ACTime = utils.ParseDateTimeTime()
|
||
octopusMessage.Result = statusRes
|
||
// 发送回去
|
||
statusOctopusReplayMessage, _ := json.Marshal(octopusMessage)
|
||
OctopusToServerQueue.Send(statusOctopusReplayMessage)
|
||
|
||
// 输出日志
|
||
log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes)
|
||
}
|
||
|
||
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
|
||
log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage))
|
||
}
|