[agent-go] 调通Executor的代码

This commit is contained in:
zeaslity
2023-03-27 16:06:33 +08:00
parent 5accd999f8
commit 31f9267401
9 changed files with 124 additions and 54 deletions

View File

@@ -2,6 +2,8 @@ package config
import ( import (
"agent-go/utils" "agent-go/utils"
"encoding/json"
"fmt"
"time" "time"
) )
@@ -30,11 +32,18 @@ func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *Oc
// 当前时间 // 当前时间
curTimeString := utils.CurTimeString() curTimeString := utils.CurTimeString()
// must write to string format, otherwise it's very hard to deserialize
bytes, err := json.Marshal(content)
if err != nil {
fmt.Sprintf("OctopusMessage Build Error ! %v", err)
}
return &OctopusMessage{ return &OctopusMessage{
UUID: curTimeString, UUID: curTimeString,
InitTime: time.Now(), InitTime: time.Now(),
Type: omType, Type: omType,
Content: content, Content: string(bytes),
Result: nil, Result: nil,
ACTime: time.Time{}, ACTime: time.Time{},
} }

View File

@@ -7,6 +7,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"os/exec" "os/exec"
"time"
) )
var log = g.G.LOG var log = g.G.LOG
@@ -27,11 +28,17 @@ func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string,
resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand) resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand)
} }
// 归一化错误和日志
if err != nil {
resultLog = append(resultLog, fmt.Sprintf("Error: %s", err.Error()))
}
// 处理执行日志 // 处理执行日志
// 是否需要返回处理日志,现在默认返回 // 是否需要返回处理日志,现在默认返回
if em.NeedResultReplay { if em.NeedResultReplay {
// 需要返回处理结果 // 需要返回处理结果
om.ACTime = time.Now()
om.Result = resultLog
} }
log.Info(fmt.Sprintf("Executor Result: %s", resultLog)) log.Info(fmt.Sprintf("Executor Result: %s", resultLog))
@@ -111,6 +118,7 @@ func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) {
var result []string var result []string
for scanner.Scan() { for scanner.Scan() {
result = append(result, scanner.Text()) result = append(result, scanner.Text())
} }
if err != nil { if err != nil {

View File

@@ -21,10 +21,10 @@ func NewLogger() (*Logger, error) {
MessageKey: "message", MessageKey: "message",
LevelKey: "level", LevelKey: "level",
TimeKey: "time", TimeKey: "time",
CallerKey: "caller", //CallerKey: "caller",
EncodeLevel: zapcore.CapitalLevelEncoder, EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder, EncodeTime: zapcore.RFC3339TimeEncoder,
EncodeCaller: zapcore.ShortCallerEncoder, //EncodeCaller: zapcore.FullCallerEncoder,
}, },
} }
logger, err := config.Build() logger, err := config.Build()

View File

@@ -2,6 +2,7 @@ package rabbitmq
import ( import (
"fmt" "fmt"
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
@@ -24,16 +25,19 @@ func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) {
channel := conn.Channel channel := conn.Channel
// 发送 // 发送
channel.Publish( err := channel.Publish(
connProp.ExchangeName, connProp.ExchangeName,
connProp.TopicKey, connProp.TopicKey,
false, false,
true, false,
amqp.Publishing{ amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: message, Body: message,
}, },
) )
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish a message: %v", err))
}
} }
func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery { func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery {

View File

@@ -10,7 +10,8 @@ import (
func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { func HandleOMsg(initOMsgFromServer *config.OctopusMessage) {
agentTopicName := initOMsgFromServer.UUID agentTopicName := initOMsgFromServer.Result.(string)
OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{ octopusConnectProp := &ConnectProperty{
@@ -47,7 +48,7 @@ func HandleOMsg(initOMsgFromServer *config.OctopusMessage) {
var om *config.OctopusMessage var om *config.OctopusMessage
err := json.Unmarshal(delivery.Body, &om) err := json.Unmarshal(delivery.Body, &om)
if err != nil { if err != nil {
log.Error("Octopus Message Parse Error !") log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处 // 保存到某处
continue continue
} }
@@ -80,12 +81,12 @@ func executorOMHandler(octopusMessage *config.OctopusMessage) {
var executionMessage *config.ExecutionMessage var executionMessage *config.ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage) err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
return return
} }
// 交给后端的实际处理器处理, 再次策略 // 交给后端的实际处理器处理, 再次策略
executor.Execute(octopusMessage, executionMessage) executor.Execute(octopusMessage, executionMessage)
} }
func statusOMHandler(octopusMessage *config.OctopusMessage) { func statusOMHandler(octopusMessage *config.OctopusMessage) {

View File

@@ -44,6 +44,12 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) {
// 获取RabbitMQ的连接 // 获取RabbitMQ的连接
conn := GetInstance() conn := GetInstance()
// 获取RabbitMQ的连接地址
//rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
//conn, err := amqp.Dial(rabbitMQEndpointFromG)
//if err != nil {
// log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
//}
ch, err := conn.Channel() ch, err := conn.Channel()
if err != nil { if err != nil {
@@ -53,7 +59,7 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) {
if err = ch.ExchangeDeclare( if err = ch.ExchangeDeclare(
property.ExchangeName, // name of the exchange property.ExchangeName, // name of the exchange
property.ExchangeType, // type of the exchange property.ExchangeType, // type of the exchange
true, // durable false, // durable
false, // delete when complete false, // delete when complete
false, // internal false, // internal
false, // noWait false, // noWait
@@ -64,7 +70,7 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) {
_, err = ch.QueueDeclare( _, err = ch.QueueDeclare(
property.QueueName, // name of the queue property.QueueName, // name of the queue
true, // durable false, // durable
false, // delete when unused false, // delete when unused
false, // exclusive false, // exclusive
false, // noWait false, // noWait

View File

@@ -37,7 +37,6 @@ func INIT() *config.AgentServerInfo {
// 建立RabbitMQ的连接 // 建立RabbitMQ的连接
// defer 关闭初始化连接 // defer 关闭初始化连接
initToServer, err := rabbitmq.NewRabbitMQConn( initToServer, err := rabbitmq.NewRabbitMQConn(
initToServerProp, initToServerProp,
) )
if err != nil { if err != nil {
@@ -45,14 +44,6 @@ func INIT() *config.AgentServerInfo {
panic(err) panic(err)
} }
//defer rabbitmq.CloseChannel(initToServer) //defer rabbitmq.CloseChannel(initToServer)
initFromServer, err := rabbitmq.NewRabbitMQConn(
initFromServerProp,
)
if err != nil {
log.Error("init from server queue established error!")
panic(err)
}
//defer rabbitmq.CloseChannel(initFromServer) //defer rabbitmq.CloseChannel(initFromServer)
// 组装OctopusMessage // 组装OctopusMessage
@@ -61,11 +52,15 @@ func INIT() *config.AgentServerInfo {
omType, omType,
agentServerInfo, agentServerInfo,
) )
msgBytes, err := json.Marshal(octopusMsg) msgBytes, err := json.Marshal(octopusMsg)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
} }
// 发送OM至MQ中O
log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes)))
// 发送OM至MQ中
rabbitmq.Send( rabbitmq.Send(
initToServer, initToServer,
initToServerProp, initToServerProp,
@@ -73,20 +68,34 @@ func INIT() *config.AgentServerInfo {
) )
// 监听初始化连接中的信息 // 监听初始化连接中的信息
initFromServer, err := rabbitmq.NewRabbitMQConn(
initFromServerProp,
)
if err != nil {
log.Error("init from server queue established error!")
panic(err)
}
// 建立运行时RabbitMQ连接 // 建立运行时RabbitMQ连接
handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp, agentServerInfo)
return agentServerInfo return agentServerInfo
} }
// handleInitMsgFromServer 处理从Server接收的注册信息 // handleInitMsgFromServer 处理从Server接收的注册信息
func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty, agentServerInfo *config.AgentServerInfo) {
deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false) deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false)
forever := make(chan bool)
go func() {
// 同步很多抢占注册的情况 // 同步很多抢占注册的情况
for delivery := range deliveries { for delivery := range deliveries {
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
var om *config.OctopusMessage var om *config.OctopusMessage
err := json.Unmarshal(delivery.Body, &om) err := json.Unmarshal(delivery.Body, &om)
if err != nil { if err != nil {
@@ -94,12 +103,23 @@ func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServ
string(delivery.Body))) string(delivery.Body)))
} }
var serverInfo config.AgentServerInfo
s, _ := om.Content.(string)
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息 // 处理OM信息
if om.UUID == g.G.AgentServerInfo.AgentTopicName { if om != nil && om.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
// 是本机的注册回复信息 // 是本机的注册回复信息
// 建立运行时RabbitMQ连接 // 建立运行时RabbitMQ连接
rabbitmq.HandleOMsg(om) // change to async
go rabbitmq.HandleOMsg(om)
// 手动确认信息 // 手动确认信息
delivery.Ack(false) delivery.Ack(false)
@@ -111,10 +131,15 @@ func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServ
} }
// 不是自身的 注册回复信息 -- 拒绝 // 不是自身的 注册回复信息 -- 拒绝
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v", om)) log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", om, delivery.Body))
delivery.Nack(false, true) delivery.Nack(false, true)
} }
}()
// wait forever
<-forever
} }
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 // shutdownRegisterQueueConnection 关闭初始化连接的两个队列

View File

@@ -0,0 +1,8 @@
{
"uuid": "2023-03-27 14:38:49",
"init_time": "2023-03-27T14:38:49.8162801+08:00",
"type": "EXECUTOR",
"content": "{\n \"needResultReplay\": true,\n \"durationTask,default:false\": false,\n \"type\": \"command\",\n \"singleLineCommand\": [\n \"ls\",\n \"-la\"\n ],\n \"multiLineCommand\": null,\n \"pipeLineCommand\": null,\n \"resultKey\": \"output\"\n}\n",
"result": "",
"ac_time": "0001-01-01T00:00:00Z"
}

View File

@@ -0,0 +1,9 @@
{
"uuid": "2023-03-27 14:38:49",
"init_time": "2023-03-27T14:38:49.8162801+08:00",
"type": "INIT",
"content": "{\"serverName\":\"Chengdu-amd64-98\",\"serverIpPbV4\":\"183.220.149.17\",\"serverIpInV4\":\"\",\"serverIpPbV6\":\"\",\"serverIpInV6\":\"\",\"location\":\"Chengdu Sichuan CN\",\"provider\":\"AS139080 The Internet Data Center of Sichuan Mobile Communication Company Limited\",\"managePort\":\"22\",\"cpuCore\":\"12 @ 4299.998 MHz\",\"cpuBrand\":\"Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz\",\"osInfo\":\"Ubuntu 20.04.5 LTS\",\"osKernelInfo\":\"5.4.0-135-generic\",\"tcpControl\":\"cubic\",\"virtualization\":\"Dedicated\",\"ioSpeed\":\"150 MB/s\",\"memoryTotal\":\"7.6 GB\",\"diskTotal\":\"914.9 GB\",\"diskUsage\":\"12.3 GB\",\"comment\":\"\",\"machineId\":\"\",\"agentVersion\":\"\",\"agentTopicName\":\"\"}",
"result": "Chengdu-amd64-98-agentGo",
"ac_time": "0001-01-01T00:00:00Z"
}