From 7e2450d30ab7b1cdce085b02a3f7d055dcfdacab Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 24 Mar 2023 15:46:00 +0800 Subject: [PATCH] =?UTF-8?q?[agent-go]=20=E5=88=9D=E6=AD=A5=E5=AE=8C?= =?UTF-8?q?=E6=88=90Executor=E9=83=A8=E5=88=86=E7=9A=84=E4=BB=A3=E7=A0=81-?= =?UTF-8?q?=201?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/IRabbitSendWriter.go | 11 +++++++++++ .../{register => config}/AgentServerInfo.go | 2 +- .../{rabbitmq => config}/OctopusMessage.go | 6 +++--- agent-go/executor/CommandExecutor.go | 4 ++-- agent-go/{config => g}/NacosConfig.go | 5 ++--- agent-go/g/global.go | 6 ++++-- agent-go/main.go | 5 ++++- agent-go/rabbitmq/MessageReaderWriter.go | 13 +++++++++++++ agent-go/rabbitmq/OctopusMsgHandler.go | 15 ++++++++------- agent-go/rabbitmq/RabbitMQConnector.go | 13 ------------- agent-go/register/AgentIntitilization.go | 19 +++++++++++-------- agent-go/{g => utils}/TimeUtils.go | 2 +- 12 files changed, 60 insertions(+), 41 deletions(-) create mode 100644 agent-go/IRabbitSendWriter.go rename agent-go/{register => config}/AgentServerInfo.go (98%) rename agent-go/{rabbitmq => config}/OctopusMessage.go (93%) rename agent-go/{config => g}/NacosConfig.go (98%) rename agent-go/{g => utils}/TimeUtils.go (96%) diff --git a/agent-go/IRabbitSendWriter.go b/agent-go/IRabbitSendWriter.go new file mode 100644 index 0000000..11186ab --- /dev/null +++ b/agent-go/IRabbitSendWriter.go @@ -0,0 +1,11 @@ +package main + +/*type RabbitSendWriter interface { + + Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) + + Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery + +} + +*/ diff --git a/agent-go/register/AgentServerInfo.go b/agent-go/config/AgentServerInfo.go similarity index 98% rename from agent-go/register/AgentServerInfo.go rename to agent-go/config/AgentServerInfo.go index 3026134..ad20396 100644 --- a/agent-go/register/AgentServerInfo.go +++ b/agent-go/config/AgentServerInfo.go @@ -1,4 +1,4 @@ -package register +package config type AgentServerInfo struct { ServerName string `json:"serverName" yaml:"serverName"` diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/config/OctopusMessage.go similarity index 93% rename from agent-go/rabbitmq/OctopusMessage.go rename to agent-go/config/OctopusMessage.go index d24f329..b4b9b3a 100644 --- a/agent-go/rabbitmq/OctopusMessage.go +++ b/agent-go/config/OctopusMessage.go @@ -1,7 +1,7 @@ -package rabbitmq +package config import ( - "agent-go/g" + "agent-go/utils" "time" ) @@ -28,7 +28,7 @@ type ExecutionMessage struct { func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage { // 当前时间 - curTimeString := g.CurTimeString() + curTimeString := utils.CurTimeString() return &OctopusMessage{ UUID: curTimeString, diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/executor/CommandExecutor.go index f3957ca..4205fa5 100644 --- a/agent-go/executor/CommandExecutor.go +++ b/agent-go/executor/CommandExecutor.go @@ -1,8 +1,8 @@ package executor import ( + "agent-go/config" "agent-go/g" - "agent-go/rabbitmq" "bufio" "bytes" "fmt" @@ -11,7 +11,7 @@ import ( var log = g.G.LOG -func Execute(om *rabbitmq.OctopusMessage, em *rabbitmq.ExecutionMessage) ([]string, error) { +func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string, error) { var resultLog []string var err error diff --git a/agent-go/config/NacosConfig.go b/agent-go/g/NacosConfig.go similarity index 98% rename from agent-go/config/NacosConfig.go rename to agent-go/g/NacosConfig.go index bb732fa..9573789 100644 --- a/agent-go/config/NacosConfig.go +++ b/agent-go/g/NacosConfig.go @@ -1,7 +1,6 @@ -package config +package g import ( - "agent-go/g" "bytes" "fmt" "github.com/nacos-group/nacos-sdk-go/v2/clients" @@ -14,7 +13,7 @@ import ( "strings" ) -var log = g.G.LOG +var log = G.LOG var group = "" func InitNacos(configFileName string) { diff --git a/agent-go/g/global.go b/agent-go/g/global.go index cd0a01b..9980668 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -1,14 +1,14 @@ package g import ( - "agent-go/register" + "agent-go/config" "github.com/spf13/viper" ) type Global struct { LOG *Logger NacosConfig *viper.Viper - AgentServerInfo *register.AgentServerInfo + AgentServerInfo *config.AgentServerInfo } const ( @@ -17,6 +17,8 @@ const ( ExecOmType = "EXECUTOR" StatusOmType = "STATUS" InitOmType = "INIT" + + // write about ) var logger, _ = NewLogger() diff --git a/agent-go/main.go b/agent-go/main.go index a8c49d0..38fe12c 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -1,11 +1,14 @@ package main import ( + "agent-go/g" "agent-go/register" "flag" "fmt" ) +var log = g.G.LOG + func main() { // 解析命令行参数 @@ -20,6 +23,6 @@ func main() { //config.InitNacos(filename) // 执行初始化之策工作 - register.INIT() + g.G.AgentServerInfo = register.INIT() } diff --git a/agent-go/rabbitmq/MessageReaderWriter.go b/agent-go/rabbitmq/MessageReaderWriter.go index a6b6e5a..9ab0c2a 100644 --- a/agent-go/rabbitmq/MessageReaderWriter.go +++ b/agent-go/rabbitmq/MessageReaderWriter.go @@ -6,6 +6,19 @@ import ( "github.com/streadway/amqp" ) +// RabbitMQConn is a struct that holds the connection and channel objects +type RabbitMQConn struct { + Connection *amqp.Connection + Channel *amqp.Channel +} + +type ConnectProperty struct { + ExchangeName string + QueueName string + ExchangeType string + TopicKey string +} + // Send 向RabbitMQ中发送消息 func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { // 往哪里发 diff --git a/agent-go/rabbitmq/OctopusMsgHandler.go b/agent-go/rabbitmq/OctopusMsgHandler.go index 13e67f6..7ef4f23 100644 --- a/agent-go/rabbitmq/OctopusMsgHandler.go +++ b/agent-go/rabbitmq/OctopusMsgHandler.go @@ -1,13 +1,14 @@ package rabbitmq import ( + "agent-go/config" "agent-go/executor" "agent-go/g" "encoding/json" "fmt" ) -func HandleOMsg(initOMsgFromServer *OctopusMessage) { +func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { agentTopicName := initOMsgFromServer.UUID OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") @@ -43,7 +44,7 @@ func HandleOMsg(initOMsgFromServer *OctopusMessage) { // 死循环,处理Ocotpus Message for delivery := range deliveries { - var om *OctopusMessage + var om *config.OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error("Octopus Message Parse Error !") @@ -57,7 +58,7 @@ func HandleOMsg(initOMsgFromServer *OctopusMessage) { } -func doHandleOctopusMessage(octopusMessage *OctopusMessage) { +func doHandleOctopusMessage(octopusMessage *config.OctopusMessage) { switch octopusMessage.Type { case g.InitOmType: @@ -72,11 +73,11 @@ func doHandleOctopusMessage(octopusMessage *OctopusMessage) { } -func executorOMHandler(octopusMessage *OctopusMessage) { +func executorOMHandler(octopusMessage *config.OctopusMessage) { executionMsgString := octopusMessage.Content.(string) - var executionMessage *ExecutionMessage + var executionMessage *config.ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { return @@ -87,10 +88,10 @@ func executorOMHandler(octopusMessage *OctopusMessage) { } -func statusOMHandler(octopusMessage *OctopusMessage) { +func statusOMHandler(octopusMessage *config.OctopusMessage) { } -func blackHoleOMHandler(octopusMessage *OctopusMessage) { +func blackHoleOMHandler(octopusMessage *config.OctopusMessage) { log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) } diff --git a/agent-go/rabbitmq/RabbitMQConnector.go b/agent-go/rabbitmq/RabbitMQConnector.go index acf9412..afda22c 100644 --- a/agent-go/rabbitmq/RabbitMQConnector.go +++ b/agent-go/rabbitmq/RabbitMQConnector.go @@ -10,19 +10,6 @@ import ( var log = g.G.LOG -// RabbitMQConn is a struct that holds the connection and channel objects -type RabbitMQConn struct { - Connection *amqp.Connection - Channel *amqp.Channel -} - -type ConnectProperty struct { - ExchangeName string - QueueName string - ExchangeType string - TopicKey string -} - // 定义全局唯一的 Singleton 实例 var instance *amqp.Connection diff --git a/agent-go/register/AgentIntitilization.go b/agent-go/register/AgentIntitilization.go index 5419b44..badd91f 100644 --- a/agent-go/register/AgentIntitilization.go +++ b/agent-go/register/AgentIntitilization.go @@ -1,6 +1,7 @@ package register import ( + "agent-go/config" "agent-go/g" "agent-go/rabbitmq" "encoding/json" @@ -9,14 +10,13 @@ import ( "io/ioutil" ) -var log = g.G.LOG var omType = g.InitOmType -var agentServerInfo = g.G.AgentServerInfo +var log = g.G.LOG -func INIT() { +func INIT() *config.AgentServerInfo { // 获取系统的环境变量 - g.G.AgentServerInfo = parseAgentServerInfo() + agentServerInfo := parseAgentServerInfo() nacosConfig := g.G.NacosConfig @@ -37,6 +37,7 @@ func INIT() { // 建立RabbitMQ的连接 // defer 关闭初始化连接 initToServer, err := rabbitmq.NewRabbitMQConn( + initToServerProp, ) if err != nil { @@ -55,7 +56,7 @@ func INIT() { defer rabbitmq.CloseChannel(initFromServer) // 组装OctopusMessage - var octopusMsg *rabbitmq.OctopusMessage + var octopusMsg *config.OctopusMessage octopusMsg = octopusMsg.BuildOctopusMsg( omType, agentServerInfo, @@ -75,6 +76,8 @@ func INIT() { // 建立运行时RabbitMQ连接 handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) + return agentServerInfo + } // handleInitMsgFromServer 处理从Server接收的注册信息 @@ -85,7 +88,7 @@ func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServ // 同步很多抢占注册的情况 for delivery := range deliveries { - var om *rabbitmq.OctopusMessage + var om *config.OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", @@ -120,11 +123,11 @@ func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, init } -func parseAgentServerInfo() *AgentServerInfo { +func parseAgentServerInfo() *config.AgentServerInfo { // 约定文件地址为 /etc/environment.d/octopus-agent.conf // 目前使用 - var agentServerInfo *AgentServerInfo + var agentServerInfo *config.AgentServerInfo yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml") if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) diff --git a/agent-go/g/TimeUtils.go b/agent-go/utils/TimeUtils.go similarity index 96% rename from agent-go/g/TimeUtils.go rename to agent-go/utils/TimeUtils.go index 9faec70..0981dca 100644 --- a/agent-go/g/TimeUtils.go +++ b/agent-go/utils/TimeUtils.go @@ -1,4 +1,4 @@ -package g +package utils import ( "time"