[ Cmii ] [ Octopus ] - reformat agent-go - 1

This commit is contained in:
zeaslity
2024-03-29 11:39:14 +08:00
parent aa4412f042
commit 1be48aaac2
52 changed files with 683 additions and 557 deletions

View File

@@ -1,77 +0,0 @@
package rabbitmq
import (
"encoding/json"
"fmt"
"wdd.io/agent-go/g"
)
var OctopusToServerQueue = &RabbitQueue{}
var P = g.G.P
func BuildOMsgRuntimeConnectorQueue(agentTopicName string) chan bool {
// 建立 业务消息 接收队列
// agentTopicName为名称的队列
agentConfig := g.G.AgentConfig
octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusMsgQueue := &RabbitQueue{
RabbitProp: octopusConnectProp,
}
octopusMsgQueue.Connect()
// 建立 业务消息 返回队列
// 统一为 OctopusToServer
octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
octopusToServerProp := &ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: octopusToServerQueueName,
ExchangeType: g.QueueTopic,
TopicKey: octopusToServerQueueName,
}
OctopusToServerQueue = &RabbitQueue{
RabbitProp: octopusToServerProp,
}
// 开启运行时消息返回队列
OctopusToServerQueue.Connect()
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue)
deliveries := octopusMsgQueue.Read(true)
businessForeverChan := make(chan bool)
P.Submit(
func() {
// 死循环处理Octopus Message
for delivery := range deliveries {
var om *OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处
continue
}
// 策略模式 处理消息
P.Submit(func() {
om.Handle()
})
}
})
return businessForeverChan
}

View File

@@ -3,11 +3,7 @@ package rabbitmq
import (
"encoding/json"
"fmt"
"strings"
"wdd.io/agent-common/utils"
"wdd.io/agent-go/executor"
"wdd.io/agent-go/g"
"wdd.io/agent-go/status"
)
type IOctopusMessage interface {
@@ -16,10 +12,6 @@ type IOctopusMessage interface {
OctopusMsgBuilder
}
type OctopusMsgHandler interface {
Handle(octopusMessage *OctopusMessage)
}
type OctopusMsgSender interface {
Send(rabbitQueue *RabbitQueue, msg []byte)
@@ -40,12 +32,6 @@ type OctopusMessage struct {
ResultCode string `json:"resultCode"`
}
func (om *OctopusMessage) Handle() {
// 实际执行 OM handle进程
log.Debug("接收到OctopusMessage,开始处理!")
doHandleOctopusMessage(om)
}
func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) {
rabbitQueue.Send(msg)
}
@@ -84,116 +70,3 @@ func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMess
ACTime: curTimeString,
}
}
func doHandleOctopusMessage(octopusMessage *OctopusMessage) {
switch octopusMessage.OctopusMessageType {
case g.InitOmType:
go func() {}()
case g.ExecOmType:
P.Submit(func() {
executorOMHandler(octopusMessage)
})
case g.StatusOmType:
P.Submit(func() {
statusOMHandler(octopusMessage)
})
case g.AgentOmType:
P.Submit(func() {
agentOMHandler(octopusMessage)
},
)
default:
P.Submit(func() {
blackHoleOMHandler(octopusMessage)
})
}
}
// agentOMHandler 处理Agent的核心操作指令
func agentOMHandler(octopusMessage *OctopusMessage) {
}
func executorOMHandler(octopusMessage *OctopusMessage) {
log.Debug("开始处理 Executor Octopus Message !")
// 转换类型
executionMsgString := octopusMessage.Content.(string)
// 解析 ExecutionMessage
var executionMessage *executor.ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil {
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
return
}
// 执行命令
ok, resultLog := executor.Execute(executionMessage)
if ok {
octopusMessage.ResultCode = "200"
} else {
octopusMessage.ResultCode = "300"
}
// 返回结果
if executionMessage.NeedResultReplay {
// send back the result log
octopusMessage.Result = resultLog
}
// 返回时间
octopusMessage.ACTime = utils.ParseDateTimeTime()
// 返回结果
octopusMessage.SendToOctopusServer()
}
func statusOMHandler(octopusMessage *OctopusMessage) {
v, ok := (octopusMessage.Content).(string)
if !ok {
log.ErrorF("convert to string is wrong %s", v)
}
statusMsgString := octopusMessage.Content.(string)
var statusMessage *status.StatusMessage
err := json.Unmarshal([]byte(statusMsgString), &statusMessage)
if err != nil {
fmt.Println(err.Error())
log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage))
return
}
// OMessageStatusTypeEnum
var statusRes string
if strings.HasPrefix(statusMessage.StatusType, "PING") {
// ping info
statusRes = status.Ping()
} else if strings.HasPrefix(statusMessage.StatusType, "METRIC") {
// metric info
agentStatusString, _ := json.Marshal(status.ReportAgentMetric())
statusRes = string(agentStatusString)
} else if strings.HasPrefix(statusMessage.StatusType, "INFO") {
log.InfoF("[statusOMHandler] - call for agent info !")
} else {
log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType)
}
// 返回消息
// 组装消息
octopusMessage.ACTime = utils.ParseDateTimeTime()
octopusMessage.Result = statusRes
// 发送回去
statusOctopusReplayMessage, _ := json.Marshal(octopusMessage)
OctopusToServerQueue.Send(statusOctopusReplayMessage)
// 输出日志
log.InfoF("接收到查询Agent状态的请求结果为 => %s", statusRes)
}
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage))
}

View File

@@ -0,0 +1,43 @@
package rabbitmq
import (
"fmt"
"wdd.io/agent-go/g"
)
var OctopusToServerQueue = &RabbitQueue{}
var P = g.G.P
type OctopusMsgHandler interface {
HandleMsg(*RabbitReceiveChan)
}
func (om *OctopusMessage) HandleMsg(rChan *RabbitReceiveChan) {
// 实际执行 OM handle进程
log.Debug("接收到OctopusMessage, 开始处理!")
doHandleOctopusMessage(om, rChan)
}
func doHandleOctopusMessage(octopusMessage *OctopusMessage, rChan *RabbitReceiveChan) {
switch octopusMessage.OctopusMessageType {
case g.InitOmType:
rChan.InitRChan <- octopusMessage
case g.ExecOmType:
rChan.ExecutorRChan <- octopusMessage
case g.StatusOmType:
rChan.StatusRChan <- octopusMessage
case g.AgentOmType:
rChan.AgentRChan <- octopusMessage
default:
P.Submit(func() {
blackHoleOMHandler(octopusMessage)
})
}
}
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage))
}

View File

@@ -1,17 +1,18 @@
package rabbitmq
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
"wdd.io/agent-common/logger"
"wdd.io/agent-go/g"
)
type RabbitMQ interface {
RabbitSendWriter
RabbitConnectCloser
RabbitQueueHandler
}
type RabbitSendWriter interface {
@@ -26,10 +27,19 @@ type RabbitConnectCloser interface {
Close() error
}
type RabbitQueue struct {
RabbitConn *RabbitMQConn
type RabbitQueueHandler interface {
Handle() chan bool
}
type RabbitQueue struct {
// 连接实体
RabbitConn *RabbitMQConn
// 连接属性
RabbitProp *ConnectProperty
// 底层连接tcp信息
RabbitConnectInfo RabbitTCPConnectInfo
// 返回消息队列
ReceiveChan *RabbitReceiveChan
}
// RabbitMQConn is a struct that holds the connection and channel objects
@@ -38,6 +48,13 @@ type RabbitMQConn struct {
Channel *amqp.Channel
}
type RabbitReceiveChan struct {
AgentRChan chan *OctopusMessage
ExecutorRChan chan *OctopusMessage
StatusRChan chan *OctopusMessage
InitRChan chan *OctopusMessage
}
type ConnectProperty struct {
ExchangeName string
QueueName string
@@ -45,6 +62,14 @@ type ConnectProperty struct {
TopicKey string
}
type RabbitTCPConnectInfo struct {
UserName string
Password string
Host string
Port string
VirtualHost string
}
var log = logger.Log
// 定义全局唯一的 Singleton 实例
@@ -54,25 +79,28 @@ var instance *amqp.Connection
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance() {
func createInstance(rabbitConnectInfo RabbitTCPConnectInfo) func() {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
rabbitMQEndpoint := parseRabbitMQEndpoint(rabbitConnectInfo)
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpointFromG)
connection, err := amqp.Dial(rabbitMQEndpoint)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
return nil
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance() *amqp.Connection {
func GetInstance(rabbitConnectInfo RabbitTCPConnectInfo) *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
once.Do(createInstance)
// todo 理解
once.Do(createInstance(rabbitConnectInfo))
return instance
}
@@ -80,7 +108,7 @@ func GetInstance() *amqp.Connection {
func (r *RabbitQueue) Connect() {
// 获取RabbitMQ的连接
conn := GetInstance()
conn := GetInstance(r.RabbitConnectInfo)
ch, err := conn.Channel()
if err != nil {
@@ -121,10 +149,26 @@ func (r *RabbitQueue) Connect() {
log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err))
}
// build for receive chan
rabbitRCha := &RabbitReceiveChan{}
if strings.HasPrefix(r.RabbitProp.QueueName, "Init") {
// init queue
rabbitRCha.InitRChan = make(chan *OctopusMessage)
} else {
// business queue
rabbitRCha.AgentRChan = make(chan *OctopusMessage, 5)
rabbitRCha.ExecutorRChan = make(chan *OctopusMessage, 5)
rabbitRCha.StatusRChan = make(chan *OctopusMessage, 5)
}
// connection
r.RabbitConn = &RabbitMQConn{
Connection: conn,
Channel: ch,
}
// receive chan
r.ReceiveChan = rabbitRCha
}
func (r *RabbitQueue) Close() error {
@@ -138,6 +182,29 @@ func (r *RabbitQueue) Close() error {
return err
}
func (r *RabbitQueue) Handle() chan bool {
deliveries := r.Read(true)
forverHandle := make(chan bool)
// 死循环处理Octopus Message
for delivery := range deliveries {
var om *OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处
continue
}
// 策略模式 处理消息
P.Submit(func() {
om.HandleMsg(r.ReceiveChan)
})
}
return forverHandle
}
// Send 向RabbitMQ中发送消息
func (r *RabbitQueue) Send(message []byte) {
// 往哪里发
@@ -159,6 +226,15 @@ func (r *RabbitQueue) Send(message []byte) {
}
}
func (r *RabbitQueue) SendOMsg(oMessage *OctopusMessage) {
bytes, err := json.Marshal(&oMessage)
if err != nil {
log.ErrorF("octopus message Marshal error %v", &oMessage)
}
r.Send(bytes)
}
func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
@@ -181,30 +257,21 @@ func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
return msgs
}
// parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string {
agentConfig := g.G.AgentConfig
func parseRabbitMQEndpoint(rabbitConnectInfo RabbitTCPConnectInfo) string {
var res strings.Builder
host := agentConfig.GetString("spring.rabbitmq.host")
port := agentConfig.GetString("spring.rabbitmq.port")
username := agentConfig.GetString("spring.rabbitmq.username")
password := agentConfig.GetString("spring.rabbitmq.password")
virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(username)
res.WriteString(rabbitConnectInfo.UserName)
res.WriteString(":")
res.WriteString(password)
res.WriteString(rabbitConnectInfo.Password)
res.WriteString("@")
res.WriteString(host)
res.WriteString(rabbitConnectInfo.Host)
res.WriteString(":")
res.WriteString(port)
res.WriteString(rabbitConnectInfo.Port)
res.WriteString("/")
res.WriteString(virtualHost)
res.WriteString(rabbitConnectInfo.VirtualHost)
s := res.String()