From 31f92674011bad8a68ca579611b4e3d00de61c79 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Mon, 27 Mar 2023 16:06:33 +0800 Subject: [PATCH] =?UTF-8?q?[agent-go]=20=E8=B0=83=E9=80=9AExecutor?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/config/OctopusMessage.go | 11 ++- agent-go/executor/CommandExecutor.go | 10 ++- agent-go/g/logger.go | 14 ++-- agent-go/rabbitmq/MessageReaderWriter.go | 8 +- agent-go/rabbitmq/OctopusMsgHandler.go | 7 +- agent-go/rabbitmq/RabbitMQConnector.go | 10 ++- agent-go/register/AgentIntitilization.go | 101 ++++++++++++++--------- agent-go/tmp/executor-om-single.json | 8 ++ agent-go/tmp/init-octopus-message.json | 9 ++ 9 files changed, 124 insertions(+), 54 deletions(-) create mode 100644 agent-go/tmp/executor-om-single.json create mode 100644 agent-go/tmp/init-octopus-message.json diff --git a/agent-go/config/OctopusMessage.go b/agent-go/config/OctopusMessage.go index b4b9b3a..cef83cd 100644 --- a/agent-go/config/OctopusMessage.go +++ b/agent-go/config/OctopusMessage.go @@ -2,6 +2,8 @@ package config import ( "agent-go/utils" + "encoding/json" + "fmt" "time" ) @@ -30,11 +32,18 @@ func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *Oc // 当前时间 curTimeString := utils.CurTimeString() + // must write to string format, otherwise it's very hard to deserialize + + bytes, err := json.Marshal(content) + if err != nil { + fmt.Sprintf("OctopusMessage Build Error ! %v", err) + } + return &OctopusMessage{ UUID: curTimeString, InitTime: time.Now(), Type: omType, - Content: content, + Content: string(bytes), Result: nil, ACTime: time.Time{}, } diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/executor/CommandExecutor.go index 4205fa5..076ffe6 100644 --- a/agent-go/executor/CommandExecutor.go +++ b/agent-go/executor/CommandExecutor.go @@ -7,6 +7,7 @@ import ( "bytes" "fmt" "os/exec" + "time" ) var log = g.G.LOG @@ -27,11 +28,17 @@ func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string, resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand) } + // 归一化错误和日志 + if err != nil { + resultLog = append(resultLog, fmt.Sprintf("Error: %s", err.Error())) + } + // 处理执行日志 // 是否需要返回处理日志,现在默认返回 if em.NeedResultReplay { // 需要返回处理结果 - + om.ACTime = time.Now() + om.Result = resultLog } log.Info(fmt.Sprintf("Executor Result: %s", resultLog)) @@ -111,6 +118,7 @@ func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) { var result []string for scanner.Scan() { result = append(result, scanner.Text()) + } if err != nil { diff --git a/agent-go/g/logger.go b/agent-go/g/logger.go index dd228a7..8f11c9c 100644 --- a/agent-go/g/logger.go +++ b/agent-go/g/logger.go @@ -18,13 +18,13 @@ func NewLogger() (*Logger, error) { OutputPaths: []string{"stdout"}, // 输出到控制台 ErrorOutputPaths: []string{"stderr"}, EncoderConfig: zapcore.EncoderConfig{ - MessageKey: "message", - LevelKey: "level", - TimeKey: "time", - CallerKey: "caller", - EncodeLevel: zapcore.CapitalLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, + MessageKey: "message", + LevelKey: "level", + TimeKey: "time", + //CallerKey: "caller", + EncodeLevel: zapcore.CapitalColorLevelEncoder, + EncodeTime: zapcore.RFC3339TimeEncoder, + //EncodeCaller: zapcore.FullCallerEncoder, }, } logger, err := config.Build() diff --git a/agent-go/rabbitmq/MessageReaderWriter.go b/agent-go/rabbitmq/MessageReaderWriter.go index ffb027b..87e1c63 100644 --- a/agent-go/rabbitmq/MessageReaderWriter.go +++ b/agent-go/rabbitmq/MessageReaderWriter.go @@ -2,6 +2,7 @@ package rabbitmq import ( "fmt" + "github.com/nacos-group/nacos-sdk-go/v2/common/logger" "github.com/streadway/amqp" ) @@ -24,16 +25,19 @@ func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { channel := conn.Channel // 发送 - channel.Publish( + err := channel.Publish( connProp.ExchangeName, connProp.TopicKey, false, - true, + false, amqp.Publishing{ ContentType: "text/plain", Body: message, }, ) + if err != nil { + logger.Error(fmt.Sprintf("Failed to publish a message: %v", err)) + } } func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery { diff --git a/agent-go/rabbitmq/OctopusMsgHandler.go b/agent-go/rabbitmq/OctopusMsgHandler.go index 7ef4f23..fa8b423 100644 --- a/agent-go/rabbitmq/OctopusMsgHandler.go +++ b/agent-go/rabbitmq/OctopusMsgHandler.go @@ -10,7 +10,8 @@ import ( func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { - agentTopicName := initOMsgFromServer.UUID + agentTopicName := initOMsgFromServer.Result.(string) + OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") octopusConnectProp := &ConnectProperty{ @@ -47,7 +48,7 @@ func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { var om *config.OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { - log.Error("Octopus Message Parse Error !") + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) // 保存到某处 continue } @@ -80,12 +81,12 @@ func executorOMHandler(octopusMessage *config.OctopusMessage) { var executionMessage *config.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { + log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) return } // 交给后端的实际处理器处理, 再次策略 executor.Execute(octopusMessage, executionMessage) - } func statusOMHandler(octopusMessage *config.OctopusMessage) { diff --git a/agent-go/rabbitmq/RabbitMQConnector.go b/agent-go/rabbitmq/RabbitMQConnector.go index c0088f3..f079934 100644 --- a/agent-go/rabbitmq/RabbitMQConnector.go +++ b/agent-go/rabbitmq/RabbitMQConnector.go @@ -44,6 +44,12 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) { // 获取RabbitMQ的连接 conn := GetInstance() + // 获取RabbitMQ的连接地址 + //rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() + //conn, err := amqp.Dial(rabbitMQEndpointFromG) + //if err != nil { + // log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) + //} ch, err := conn.Channel() if err != nil { @@ -53,7 +59,7 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) { if err = ch.ExchangeDeclare( property.ExchangeName, // name of the exchange property.ExchangeType, // type of the exchange - true, // durable + false, // durable false, // delete when complete false, // internal false, // noWait @@ -64,7 +70,7 @@ func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) { _, err = ch.QueueDeclare( property.QueueName, // name of the queue - true, // durable + false, // durable false, // delete when unused false, // exclusive false, // noWait diff --git a/agent-go/register/AgentIntitilization.go b/agent-go/register/AgentIntitilization.go index 56e71ef..36ad407 100644 --- a/agent-go/register/AgentIntitilization.go +++ b/agent-go/register/AgentIntitilization.go @@ -37,7 +37,6 @@ func INIT() *config.AgentServerInfo { // 建立RabbitMQ的连接 // defer 关闭初始化连接 initToServer, err := rabbitmq.NewRabbitMQConn( - initToServerProp, ) if err != nil { @@ -45,14 +44,6 @@ func INIT() *config.AgentServerInfo { panic(err) } //defer rabbitmq.CloseChannel(initToServer) - - initFromServer, err := rabbitmq.NewRabbitMQConn( - initFromServerProp, - ) - if err != nil { - log.Error("init from server queue established error!") - panic(err) - } //defer rabbitmq.CloseChannel(initFromServer) // 组装OctopusMessage @@ -61,11 +52,15 @@ func INIT() *config.AgentServerInfo { omType, agentServerInfo, ) + msgBytes, err := json.Marshal(octopusMsg) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) } - // 发送OM至MQ中O + + log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes))) + + // 发送OM至MQ中 rabbitmq.Send( initToServer, initToServerProp, @@ -73,47 +68,77 @@ func INIT() *config.AgentServerInfo { ) // 监听初始化连接中的信息 + initFromServer, err := rabbitmq.NewRabbitMQConn( + initFromServerProp, + ) + if err != nil { + log.Error("init from server queue established error!") + panic(err) + } + // 建立运行时RabbitMQ连接 - handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) + handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp, agentServerInfo) return agentServerInfo } // handleInitMsgFromServer 处理从Server接收的注册信息 -func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { +func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty, agentServerInfo *config.AgentServerInfo) { deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false) - // 同步很多抢占注册的情况 - for delivery := range deliveries { + forever := make(chan bool) - var om *config.OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", - string(delivery.Body))) + go func() { + + // 同步很多抢占注册的情况 + for delivery := range deliveries { + + log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) + + var om *config.OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", + string(delivery.Body))) + } + + var serverInfo config.AgentServerInfo + + s, _ := om.Content.(string) + cc := json.Unmarshal([]byte(s), &serverInfo) + if cc != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) + } + + serverName := serverInfo.ServerName + + // 处理OM信息 + if om != nil && om.Type == g.InitOmType && serverName == agentServerInfo.ServerName { + // 是本机的注册回复信息 + + // 建立运行时RabbitMQ连接 + // change to async + go rabbitmq.HandleOMsg(om) + + // 手动确认信息 + delivery.Ack(false) + + // 手动关闭 注册队列的连接 + shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp) + + return + } + + // 不是自身的 注册回复信息 -- 拒绝 + log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", om, delivery.Body)) + delivery.Nack(false, true) } - // 处理OM信息 - if om.UUID == g.G.AgentServerInfo.AgentTopicName { - // 是本机的注册回复信息 + }() - // 建立运行时RabbitMQ连接 - rabbitmq.HandleOMsg(om) - - // 手动确认信息 - delivery.Ack(false) - - // 手动关闭 注册队列的连接 - shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp) - - return - } - - // 不是自身的 注册回复信息 -- 拒绝 - log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v", om)) - delivery.Nack(false, true) - } + // wait forever + <-forever } diff --git a/agent-go/tmp/executor-om-single.json b/agent-go/tmp/executor-om-single.json new file mode 100644 index 0000000..f48b02e --- /dev/null +++ b/agent-go/tmp/executor-om-single.json @@ -0,0 +1,8 @@ +{ + "uuid": "2023-03-27 14:38:49", + "init_time": "2023-03-27T14:38:49.8162801+08:00", + "type": "EXECUTOR", + "content": "{\n \"needResultReplay\": true,\n \"durationTask,default:false\": false,\n \"type\": \"command\",\n \"singleLineCommand\": [\n \"ls\",\n \"-la\"\n ],\n \"multiLineCommand\": null,\n \"pipeLineCommand\": null,\n \"resultKey\": \"output\"\n}\n", + "result": "", + "ac_time": "0001-01-01T00:00:00Z" +} diff --git a/agent-go/tmp/init-octopus-message.json b/agent-go/tmp/init-octopus-message.json new file mode 100644 index 0000000..c1fc9c9 --- /dev/null +++ b/agent-go/tmp/init-octopus-message.json @@ -0,0 +1,9 @@ +{ + "uuid": "2023-03-27 14:38:49", + "init_time": "2023-03-27T14:38:49.8162801+08:00", + "type": "INIT", + "content": "{\"serverName\":\"Chengdu-amd64-98\",\"serverIpPbV4\":\"183.220.149.17\",\"serverIpInV4\":\"\",\"serverIpPbV6\":\"\",\"serverIpInV6\":\"\",\"location\":\"Chengdu Sichuan CN\",\"provider\":\"AS139080 The Internet Data Center of Sichuan Mobile Communication Company Limited\",\"managePort\":\"22\",\"cpuCore\":\"12 @ 4299.998 MHz\",\"cpuBrand\":\"Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz\",\"osInfo\":\"Ubuntu 20.04.5 LTS\",\"osKernelInfo\":\"5.4.0-135-generic\",\"tcpControl\":\"cubic\",\"virtualization\":\"Dedicated\",\"ioSpeed\":\"150 MB/s\",\"memoryTotal\":\"7.6 GB\",\"diskTotal\":\"914.9 GB\",\"diskUsage\":\"12.3 GB\",\"comment\":\"\",\"machineId\":\"\",\"agentVersion\":\"\",\"agentTopicName\":\"\"}", + "result": "Chengdu-amd64-98-agentGo", + "ac_time": "0001-01-01T00:00:00Z" +} +