From 124143c6c6b847aad4b050e78325ec85b41ef717 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Thu, 30 Mar 2023 14:26:11 +0800 Subject: [PATCH] =?UTF-8?q?[agent-go]=20=E8=B0=83=E6=95=B4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/IRabbitSendWriter.go | 11 - agent-go/config/OctopusMessage.go | 50 ---- agent-go/executor/CommandExecutor.go | 32 +-- agent-go/g/Nacos.go | 1 - agent-go/g/global.go | 23 +- agent-go/{g => logger}/logger.go | 4 +- agent-go/main.go | 7 +- agent-go/rabbitmq/MessageReaderWriter.go | 63 ------ agent-go/rabbitmq/OMsgConnector.go | 72 ++++++ agent-go/rabbitmq/OctopusMessage.go | 113 +++++++++ agent-go/rabbitmq/OctopusMsgHandler.go | 106 --------- agent-go/rabbitmq/RabbitMQConnector.go | 154 ------------- agent-go/rabbitmq/RabbitMsgQueue.go | 214 ++++++++++++++++++ agent-go/register/AgentInitialization.go | 186 +++++++++++++++ agent-go/register/AgentIntitilization.go | 175 -------------- .../{config => register}/AgentServerInfo.go | 2 +- .../NacosInitalization.go} | 3 +- 17 files changed, 622 insertions(+), 594 deletions(-) delete mode 100644 agent-go/IRabbitSendWriter.go delete mode 100644 agent-go/config/OctopusMessage.go delete mode 100644 agent-go/g/Nacos.go rename agent-go/{g => logger}/logger.go (97%) delete mode 100644 agent-go/rabbitmq/MessageReaderWriter.go create mode 100644 agent-go/rabbitmq/OMsgConnector.go create mode 100644 agent-go/rabbitmq/OctopusMessage.go delete mode 100644 agent-go/rabbitmq/OctopusMsgHandler.go delete mode 100644 agent-go/rabbitmq/RabbitMQConnector.go create mode 100644 agent-go/rabbitmq/RabbitMsgQueue.go create mode 100644 agent-go/register/AgentInitialization.go delete mode 100644 agent-go/register/AgentIntitilization.go rename agent-go/{config => register}/AgentServerInfo.go (98%) rename agent-go/{g/NacosConfig.go => register/NacosInitalization.go} (99%) diff --git a/agent-go/IRabbitSendWriter.go b/agent-go/IRabbitSendWriter.go deleted file mode 100644 index 11186ab..0000000 --- a/agent-go/IRabbitSendWriter.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -/*type RabbitSendWriter interface { - - Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) - - Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery - -} - -*/ diff --git a/agent-go/config/OctopusMessage.go b/agent-go/config/OctopusMessage.go deleted file mode 100644 index cef83cd..0000000 --- a/agent-go/config/OctopusMessage.go +++ /dev/null @@ -1,50 +0,0 @@ -package config - -import ( - "agent-go/utils" - "encoding/json" - "fmt" - "time" -) - -type OctopusMessage struct { - UUID string `json:"uuid"` - InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"` - Type string `json:"type"` - Content interface{} `json:"content"` - Result interface{} `json:"result"` - ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"` -} - -type ExecutionMessage struct { - NeedResultReplay bool `json:"needResultReplay"` - DurationTask bool `json:"durationTask,default:false"` - Type string `json:"type"` - SingleLineCommand []string `json:"singleLineCommand"` - MultiLineCommand [][]string `json:"multiLineCommand"` - PipeLineCommand [][]string `json:"pipeLineCommand"` - ResultKey string `json:"resultKey"` -} - -// BuildOctopusMsg 生成OctopusMessage -func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage { - - // 当前时间 - curTimeString := utils.CurTimeString() - - // must write to string format, otherwise it's very hard to deserialize - - bytes, err := json.Marshal(content) - if err != nil { - fmt.Sprintf("OctopusMessage Build Error ! %v", err) - } - - return &OctopusMessage{ - UUID: curTimeString, - InitTime: time.Now(), - Type: omType, - Content: string(bytes), - Result: nil, - ACTime: time.Time{}, - } -} diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/executor/CommandExecutor.go index 076ffe6..a972fee 100644 --- a/agent-go/executor/CommandExecutor.go +++ b/agent-go/executor/CommandExecutor.go @@ -1,31 +1,43 @@ package executor import ( - "agent-go/config" - "agent-go/g" + logger2 "agent-go/logger" "bufio" "bytes" "fmt" "os/exec" - "time" ) -var log = g.G.LOG +type ExecutionMessage struct { + NeedResultReplay bool `json:"needResultReplay"` + DurationTask bool `json:"durationTask,default:false"` + Type string `json:"type"` + SingleLineCommand []string `json:"singleLineCommand"` + MultiLineCommand [][]string `json:"multiLineCommand"` + PipeLineCommand [][]string `json:"pipeLineCommand"` + ResultKey string `json:"resultKey"` +} -func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string, error) { +var log = logger2.Log + +func Execute(em *ExecutionMessage) ([]string, error) { var resultLog []string var err error + var realCommand [][]string if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 { // 管道命令 resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand) + realCommand = em.PipeLineCommand } else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 { // 多行命令 resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand) + realCommand = em.MultiLineCommand } else { // 单行命令 resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand) + realCommand = [][]string{em.SingleLineCommand} } // 归一化错误和日志 @@ -33,15 +45,9 @@ func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string, resultLog = append(resultLog, fmt.Sprintf("Error: %s", err.Error())) } - // 处理执行日志 - // 是否需要返回处理日志,现在默认返回 - if em.NeedResultReplay { - // 需要返回处理结果 - om.ACTime = time.Now() - om.Result = resultLog - } + commandResult := fmt.Sprintf("Excution Comand are=> %v, Executor Result: %v", realCommand, resultLog) - log.Info(fmt.Sprintf("Executor Result: %s", resultLog)) + log.Info(commandResult) return resultLog, err } diff --git a/agent-go/g/Nacos.go b/agent-go/g/Nacos.go deleted file mode 100644 index c43b680..0000000 --- a/agent-go/g/Nacos.go +++ /dev/null @@ -1 +0,0 @@ -package g diff --git a/agent-go/g/global.go b/agent-go/g/global.go index cdd3915..4cf4360 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -1,16 +1,15 @@ package g import ( - "agent-go/config" + logger2 "agent-go/logger" "github.com/panjf2000/ants/v2" "github.com/spf13/viper" ) type Global struct { - LOG *Logger - NacosConfig *viper.Viper - AgentServerInfo *config.AgentServerInfo - P *ants.Pool + AgentHasRegister bool + NacosConfig *viper.Viper + P *ants.Pool } const ( @@ -21,21 +20,17 @@ const ( InitOmType = "INIT" ) -var logger, _ = NewLogger() - -var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger)) +var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger2.Log)) var G = NewGlobal( - logger, pool, ) // NewGlobal NewGlobal构造函数返回一个新的Global实例,其中包含指定的Logger。 -func NewGlobal(logger *Logger, pool *ants.Pool) *Global { +func NewGlobal(pool *ants.Pool) *Global { return &Global{ - LOG: logger, - NacosConfig: nil, - AgentServerInfo: nil, - P: pool, + AgentHasRegister: false, + NacosConfig: nil, + P: pool, } } diff --git a/agent-go/g/logger.go b/agent-go/logger/logger.go similarity index 97% rename from agent-go/g/logger.go rename to agent-go/logger/logger.go index b099413..11177a8 100644 --- a/agent-go/g/logger.go +++ b/agent-go/logger/logger.go @@ -1,4 +1,4 @@ -package g +package logger import ( "fmt" @@ -11,6 +11,8 @@ type Logger struct { *zap.Logger } +var Log, _ = NewLogger() + // NewLogger creates a new Logger instance. func NewLogger() (*Logger, error) { config := zap.Config{ diff --git a/agent-go/main.go b/agent-go/main.go index 4b61e8b..8ed778d 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -2,12 +2,13 @@ package main import ( "agent-go/g" + logger2 "agent-go/logger" "agent-go/register" "flag" "fmt" ) -var log = g.G.LOG +var log = logger2.Log func main() { @@ -20,9 +21,9 @@ func main() { println(filename) // 初始化Nacos的连接配置 - g.G.NacosConfig = g.InitNacos(filename) + g.G.NacosConfig = register.InitNacos(filename) // 执行初始化之策工作 - g.G.AgentServerInfo = register.INIT() + register.AgentServerInfoCache = register.INIT() } diff --git a/agent-go/rabbitmq/MessageReaderWriter.go b/agent-go/rabbitmq/MessageReaderWriter.go deleted file mode 100644 index 87e1c63..0000000 --- a/agent-go/rabbitmq/MessageReaderWriter.go +++ /dev/null @@ -1,63 +0,0 @@ -package rabbitmq - -import ( - "fmt" - "github.com/nacos-group/nacos-sdk-go/v2/common/logger" - "github.com/streadway/amqp" -) - -// RabbitMQConn is a struct that holds the connection and channel objects -type RabbitMQConn struct { - Connection *amqp.Connection - Channel *amqp.Channel -} - -type ConnectProperty struct { - ExchangeName string - QueueName string - ExchangeType string - TopicKey string -} - -// Send 向RabbitMQ中发送消息 -func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { - // 往哪里发 - channel := conn.Channel - - // 发送 - err := channel.Publish( - connProp.ExchangeName, - connProp.TopicKey, - false, - false, - amqp.Publishing{ - ContentType: "text/plain", - Body: message, - }, - ) - if err != nil { - logger.Error(fmt.Sprintf("Failed to publish a message: %v", err)) - } -} - -func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery { - - // 拿到特定的Channel - channel := conn.Channel - - // 开始读取队列中的全部消息 - msgs, err := channel.Consume( - connProp.QueueName, // 队列名称 - "", // 消费者名称 - autoAck, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) - } - - return msgs -} diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go new file mode 100644 index 0000000..ec016ff --- /dev/null +++ b/agent-go/rabbitmq/OMsgConnector.go @@ -0,0 +1,72 @@ +package rabbitmq + +import ( + "agent-go/g" + "encoding/json" + "fmt" +) + +var OctopusToServerQueue = &RabbitQueue{} + +func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { + + // 建立 业务消息 接收队列 + // agentTopicName为名称的队列 + nacosConfig := g.G.NacosConfig + + octopusExchangeName := nacosConfig.GetString("octopus.message.octopus_exchange") + + octopusConnectProp := &ConnectProperty{ + ExchangeName: octopusExchangeName, + QueueName: agentTopicName, + ExchangeType: g.QueueTopic, + TopicKey: agentTopicName + "*", + } + + octopusMsgQueue := &RabbitQueue{ + RabbitProp: octopusConnectProp, + } + octopusMsgQueue.Connect() + + deliveries := octopusMsgQueue.Read(true) + + // 死循环,处理Octopus Message + P.Submit( + func() { + for delivery := range deliveries { + + var om *OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) + // 保存到某处 + continue + } + + // 策略模式 处理消息 + om.Handle() + } + }) + + // 建立 业务消息 返回队列 + // 统一为 OctopusToServer + + octopusToServerQueueName := nacosConfig.GetString("octopus.message.octopus_to_server") + + octopusToServerProp := &ConnectProperty{ + ExchangeName: octopusExchangeName, + QueueName: octopusToServerQueueName, + ExchangeType: g.QueueTopic, + TopicKey: octopusToServerQueueName, + } + + OctopusToServerQueue = &RabbitQueue{ + RabbitProp: octopusToServerProp, + } + + // 开启运行时消息返回队列 + OctopusToServerQueue.Connect() + + log.InfoF("Octopus Message Replay Queue is established ! => %v", OctopusToServerQueue) + +} diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go new file mode 100644 index 0000000..3b57302 --- /dev/null +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -0,0 +1,113 @@ +package rabbitmq + +import ( + "agent-go/executor" + "agent-go/g" + "agent-go/utils" + "encoding/json" + "fmt" + "time" +) + +var P = g.G.P + +type IOctopusMessage interface { + OctopusMsgHandler + OctopusMsgSender + OctopusMsgBuilder +} + +type OctopusMsgHandler interface { + Handle(octopusMessage *OctopusMessage) +} + +type OctopusMsgSender interface { + Send(rabbitQueue *RabbitQueue, msg []byte) +} + +type OctopusMsgBuilder interface { + Build(omType string, content interface{}) *OctopusMessage +} + +type OctopusMessage struct { + UUID string `json:"uuid"` + InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"` + Type string `json:"type"` + Content interface{} `json:"content"` + Result interface{} `json:"result"` + ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"` +} + +func (om *OctopusMessage) Handle() { + // 实际执行 OM handle进程 + doHandleOctopusMessage(om) +} + +func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { + rabbitQueue.Send(msg) +} + +func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage { + + // 当前时间 + curTimeString := utils.CurTimeString() + + // must write to string format, otherwise it's very hard to deserialize + bytes, err := json.Marshal(content) + if err != nil { + fmt.Sprintf("OctopusMessage Build Error ! %v", err) + } + + return &OctopusMessage{ + UUID: curTimeString, + InitTime: time.Now(), + Type: omType, + Content: string(bytes), + Result: nil, + ACTime: time.Time{}, + } +} + +func doHandleOctopusMessage(octopusMessage *OctopusMessage) { + + switch octopusMessage.Type { + case g.InitOmType: + go func() {}() + case g.ExecOmType: + P.Submit(func() { + executorOMHandler(octopusMessage) + }) + case g.StatusOmType: + P.Submit(func() { + statusOMHandler(octopusMessage) + }) + default: + P.Submit(func() { + blackHoleOMHandler(octopusMessage) + }) + } + +} + +func executorOMHandler(octopusMessage *OctopusMessage) { + + executionMsgString := octopusMessage.Content.(string) + + var executionMessage *executor.ExecutionMessage + err := json.Unmarshal([]byte(executionMsgString), &executionMessage) + if err != nil { + log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) + return + } + + // 交给后端的实际处理器处理, 再次策略 + executor.Execute(executionMessage) +} + +func statusOMHandler(octopusMessage *OctopusMessage) { + +} + +func blackHoleOMHandler(octopusMessage *OctopusMessage) { + log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) +} diff --git a/agent-go/rabbitmq/OctopusMsgHandler.go b/agent-go/rabbitmq/OctopusMsgHandler.go deleted file mode 100644 index cded014..0000000 --- a/agent-go/rabbitmq/OctopusMsgHandler.go +++ /dev/null @@ -1,106 +0,0 @@ -package rabbitmq - -import ( - "agent-go/config" - "agent-go/executor" - "agent-go/g" - "encoding/json" - "fmt" -) - -var P = g.G.P - -func HandleOMsg(initOMsgFromServer *config.OctopusMessage) { - - agentTopicName := initOMsgFromServer.Result.(string) - - OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") - - octopusConnectProp := &ConnectProperty{ - ExchangeName: OctopusExchange, - QueueName: agentTopicName, - ExchangeType: g.QueueTopic, - TopicKey: agentTopicName + "*", - } - - octopusConn, err := NewRabbitMQConn(octopusConnectProp) - if err != nil { - log.Error(fmt.Sprintf("Octopus Message Queue create Error ! => %v", octopusConnectProp)) - panic(err) - } - - // 开始接收消息 - channel := octopusConn.Channel - deliveries, err := channel.Consume( - agentTopicName, - agentTopicName, - true, - false, - false, - false, - nil, - ) - if err != nil { - return - } - - // 死循环,处理Ocotpus Message - for delivery := range deliveries { - - var om *config.OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) - // 保存到某处 - continue - } - - // 策略模式 处理消息 - doHandleOctopusMessage(om) - } - -} - -func doHandleOctopusMessage(octopusMessage *config.OctopusMessage) { - - switch octopusMessage.Type { - case g.InitOmType: - go func() {}() - case g.ExecOmType: - P.Submit(func() { - executorOMHandler(octopusMessage) - }) - case g.StatusOmType: - P.Submit(func() { - statusOMHandler(octopusMessage) - }) - default: - P.Submit(func() { - blackHoleOMHandler(octopusMessage) - }) - } - -} - -func executorOMHandler(octopusMessage *config.OctopusMessage) { - - executionMsgString := octopusMessage.Content.(string) - - var executionMessage *config.ExecutionMessage - err := json.Unmarshal([]byte(executionMsgString), &executionMessage) - if err != nil { - log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) - return - } - - // 交给后端的实际处理器处理, 再次策略 - executor.Execute(octopusMessage, executionMessage) -} - -func statusOMHandler(octopusMessage *config.OctopusMessage) { - -} - -func blackHoleOMHandler(octopusMessage *config.OctopusMessage) { - log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) -} diff --git a/agent-go/rabbitmq/RabbitMQConnector.go b/agent-go/rabbitmq/RabbitMQConnector.go deleted file mode 100644 index f079934..0000000 --- a/agent-go/rabbitmq/RabbitMQConnector.go +++ /dev/null @@ -1,154 +0,0 @@ -package rabbitmq - -import ( - "agent-go/g" - "fmt" - "github.com/streadway/amqp" - "strings" - "sync" -) - -var log = g.G.LOG - -// 定义全局唯一的 Singleton 实例 -var instance *amqp.Connection - -// 用 sync.Once 变量确保初始化函数只会被调用一次 -var once sync.Once - -// 初始化 Singleton 实例的函数 -func createInstance() { - // 在这里进行 Singleton 的初始化操作 - - // 获取RabbitMQ的连接地址 - rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() - - // 创建全局唯一连接 RabbitMQ连接 - connection, err := amqp.Dial(rabbitMQEndpointFromG) - if err != nil { - log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) - } - - instance = connection -} - -// GetInstance 获取全局唯一的 Singleton 实例的函数 -func GetInstance() *amqp.Connection { - // 使用 sync.Once 确保 createInstance 只会被调用一次 - once.Do(createInstance) - return instance -} - -// NewRabbitMQConn creates a new RabbitMQ connection object -func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) { - - // 获取RabbitMQ的连接 - conn := GetInstance() - // 获取RabbitMQ的连接地址 - //rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() - //conn, err := amqp.Dial(rabbitMQEndpointFromG) - //if err != nil { - // log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) - //} - - ch, err := conn.Channel() - if err != nil { - return nil, fmt.Errorf("failed to create RabbitMQ channel: %w", err) - } - - if err = ch.ExchangeDeclare( - property.ExchangeName, // name of the exchange - property.ExchangeType, // type of the exchange - false, // durable - false, // delete when complete - false, // internal - false, // noWait - nil, // arguments - ); err != nil { - return nil, fmt.Errorf("failed to declare RabbitMQ exchange: %w", err) - } - - _, err = ch.QueueDeclare( - property.QueueName, // name of the queue - false, // durable - false, // delete when unused - false, // exclusive - false, // noWait - nil, // arguments - ) - if err != nil { - return nil, fmt.Errorf("failed to declare RabbitMQ queue: %w", err) - } - - if err = ch.QueueBind( - property.QueueName, // name of the queue - property.TopicKey, // routing key - all topics - property.ExchangeName, // name of the exchange - false, // noWait - nil, // arguments - ); err != nil { - return nil, fmt.Errorf("failed to bind RabbitMQ queue: %w", err) - } - - return &RabbitMQConn{Connection: conn, Channel: ch}, nil -} - -// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址 -func parseRabbitMQEndpointFromG() string { - - nacosConfig := g.G.NacosConfig - - var res strings.Builder - - host := nacosConfig.GetString("spring.rabbitmq.host") - port := nacosConfig.GetString("spring.rabbitmq.port") - username := nacosConfig.GetString("spring.rabbitmq.username") - password := nacosConfig.GetString("spring.rabbitmq.password") - virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host") - - // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} - res.WriteString("amqp://") - res.WriteString(username) - res.WriteString(":") - res.WriteString(password) - res.WriteString("@") - res.WriteString(host) - res.WriteString(":") - res.WriteString(port) - res.WriteString("/") - res.WriteString(virtualHost) - - s := res.String() - log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s)) - return s -} - -func CloseChannel(conn *RabbitMQConn) error { - var err error - - if conn.Channel != nil { - if err = conn.Channel.Close(); err != nil { - log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err)) - } - } - return err -} - -// CloseRabbitMQAll closes the RabbitMQ connection and channel -func (r *RabbitMQConn) CloseRabbitMQAll() error { - var err error - - if r.Channel != nil { - if err = r.Channel.Close(); err != nil { - log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err)) - } - } - - if r.Connection != nil { - if err = r.Connection.Close(); err != nil { - log.Error(fmt.Sprintf("Failed to close RabbitMQ connection: %v", err)) - } - } - - return err -} diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go new file mode 100644 index 0000000..d89111e --- /dev/null +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -0,0 +1,214 @@ +package rabbitmq + +import ( + "agent-go/g" + logger2 "agent-go/logger" + "fmt" + "github.com/streadway/amqp" + "strings" + "sync" +) + +type RabbitMQ interface { + RabbitSendWriter + + RabbitConnectCloser +} + +type RabbitSendWriter interface { + Send(message []byte) + + Read(autoAck bool) <-chan amqp.Delivery +} + +type RabbitConnectCloser interface { + Connect() + + Close() error +} + +type RabbitQueue struct { + RabbitConn *RabbitMQConn + RabbitProp *ConnectProperty +} + +// RabbitMQConn is a struct that holds the connection and channel objects +type RabbitMQConn struct { + Connection *amqp.Connection + Channel *amqp.Channel +} + +type ConnectProperty struct { + ExchangeName string + QueueName string + ExchangeType string + TopicKey string +} + +var log = logger2.Log + +// 定义全局唯一的 Singleton 实例 +var instance *amqp.Connection + +// 用 sync.Once 变量确保初始化函数只会被调用一次 +var once sync.Once + +// 初始化 Singleton 实例的函数 +func createInstance() { + // 在这里进行 Singleton 的初始化操作 + + // 获取RabbitMQ的连接地址 + rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() + + // 创建全局唯一连接 RabbitMQ连接 + connection, err := amqp.Dial(rabbitMQEndpointFromG) + if err != nil { + log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) + } + + instance = connection +} + +// GetInstance 获取全局唯一的 Singleton 实例的函数 +func GetInstance() *amqp.Connection { + // 使用 sync.Once 确保 createInstance 只会被调用一次 + once.Do(createInstance) + return instance +} + +// Connect creates a new RabbitMQ connection object +func (r *RabbitQueue) Connect() { + + // 获取RabbitMQ的连接 + conn := GetInstance() + + ch, err := conn.Channel() + if err != nil { + log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err)) + } + + if err = ch.ExchangeDeclare( + r.RabbitProp.ExchangeName, // name of the exchange + r.RabbitProp.ExchangeType, // type of the exchange + false, // durable + false, // delete when complete + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + log.Error(fmt.Sprintf("failed to declare exchange !: %w", err)) + } + + _, err = ch.QueueDeclare( + r.RabbitProp.QueueName, // name of the queue + false, // durable + false, // delete when unused + false, // exclusive + false, // noWait + nil, // arguments + ) + if err != nil { + log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err)) + } + + if err = ch.QueueBind( + r.RabbitProp.QueueName, // name of the queue + r.RabbitProp.TopicKey, // routing key - all topics + r.RabbitProp.ExchangeName, // name of the exchange + false, // noWait + nil, // arguments + ); err != nil { + log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err)) + } + + r.RabbitConn = &RabbitMQConn{ + Connection: conn, + Channel: ch, + } +} + +func (r *RabbitQueue) Close() error { + var err error + + if r.RabbitConn.Channel != nil { + if err = r.RabbitConn.Channel.Close(); err != nil { + log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err)) + } + } + return err +} + +// Send 向RabbitMQ中发送消息 +func (r *RabbitQueue) Send(message []byte) { + // 往哪里发 + channel := r.RabbitConn.Channel + + // 发送 + err := channel.Publish( + r.RabbitProp.ExchangeName, + r.RabbitProp.TopicKey, + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: message, + }, + ) + if err != nil { + log.Error(fmt.Sprintf("Failed to publish a message: %v", err)) + } +} + +func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { + + // 拿到特定的Channel + channel := r.RabbitConn.Channel + + // 开始读取队列中的全部消息 + msgs, err := channel.Consume( + r.RabbitProp.QueueName, // 队列名称 + "", // 消费者名称 + autoAck, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // arguments + ) + if err != nil { + log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) + } + + return msgs +} + +// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址 +func parseRabbitMQEndpointFromG() string { + + nacosConfig := g.G.NacosConfig + + var res strings.Builder + + host := nacosConfig.GetString("spring.rabbitmq.host") + port := nacosConfig.GetString("spring.rabbitmq.port") + username := nacosConfig.GetString("spring.rabbitmq.username") + password := nacosConfig.GetString("spring.rabbitmq.password") + virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host") + + // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} + res.WriteString("amqp://") + res.WriteString(username) + res.WriteString(":") + res.WriteString(password) + res.WriteString("@") + res.WriteString(host) + res.WriteString(":") + res.WriteString(port) + res.WriteString("/") + res.WriteString(virtualHost) + + s := res.String() + + log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s)) + + return s +} diff --git a/agent-go/register/AgentInitialization.go b/agent-go/register/AgentInitialization.go new file mode 100644 index 0000000..fac6152 --- /dev/null +++ b/agent-go/register/AgentInitialization.go @@ -0,0 +1,186 @@ +package register + +import ( + "agent-go/g" + logger2 "agent-go/logger" + "agent-go/rabbitmq" + "encoding/json" + "fmt" + "gopkg.in/yaml.v3" + "io/ioutil" + "time" +) + +var omType = g.InitOmType +var log = logger2.Log +var P = g.G.P + +var AgentServerInfoCache = &AgentServerInfo{} + +func INIT() *AgentServerInfo { + + // 获取系统的环境变量 + agentServerInfo := parseAgentServerInfo() + + nacosConfig := g.G.NacosConfig + + initToServerProp := &rabbitmq.ConnectProperty{ + ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), + QueueName: nacosConfig.GetString("octopus.message.init_to_server"), + ExchangeType: g.QueueDirect, + TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"), + } + + initFromServerProp := &rabbitmq.ConnectProperty{ + ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), + QueueName: nacosConfig.GetString("octopus.message.init_from_server"), + ExchangeType: g.QueueDirect, + TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"), + } + + // 建立RabbitMQ的连接 + initToServerQueue := &rabbitmq.RabbitQueue{ + RabbitProp: initToServerProp, + } + defer initToServerQueue.Close() + + // 建立连接 + initToServerQueue.Connect() + + // 组装OctopusMessage + var octopusMsg *rabbitmq.OctopusMessage + octopusMsg = octopusMsg.Build( + omType, + agentServerInfo, + ) + msgBytes, err := json.Marshal(octopusMsg) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) + } + log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes))) + + // 发送OM至MQ中 + P.Submit( + func() { + for g.G.AgentHasRegister == false { + + //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 + initToServerQueue.Send( + msgBytes, + ) + + // 休眠 + time.Sleep(10 * time.Minute) + + } + + }) + + // 监听初始化连接中的信息 + initFromServerQueue := &rabbitmq.RabbitQueue{ + RabbitProp: initFromServerProp, + } + defer initFromServerQueue.Close() + + // 建立连接 + initFromServerQueue.Connect() + + // 建立运行时RabbitMQ连接 + handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) + + return agentServerInfo +} + +// handleInitMsgFromServer 处理从Server接收的 注册信息 +func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *AgentServerInfo) { + + initOctopusMessageDeliveries := initFromServerQueue.Read(false) + + forever := make(chan bool) + + // use the ant goroutine pool + P.Submit( + func() { + + // 同步很多抢占注册的情况 + for delivery := range initOctopusMessageDeliveries { + + log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) + + var initOctopusMsg *rabbitmq.OctopusMessage + err := json.Unmarshal(delivery.Body, &initOctopusMsg) + if err != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", + string(delivery.Body))) + } + + var serverInfo AgentServerInfo + + s, _ := initOctopusMsg.Content.(string) + cc := json.Unmarshal([]byte(s), &serverInfo) + if cc != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) + } + serverName := serverInfo.ServerName + + // 处理OM信息 + if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName { + // 是本机的注册回复信息 + + // 建立 运行时 RabbitMQ连接 + agentTopicName := initOctopusMsg.Result.(string) + rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName) + + // 手动确认信息 + delivery.Ack(false) + + // 手动关闭 注册队列的连接 + shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) + + return + } + + // 不是自身的 注册回复信息 -- 拒绝 + log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body)) + delivery.Nack(false, true) + } + + }, + ) + + // wait forever + <-forever + +} + +// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 +func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { + +} + +func parseAgentServerInfo() *AgentServerInfo { + + // 约定文件地址为 /etc/environment.d/octopus-agent.conf + // 目前使用 + var agentServerInfo *AgentServerInfo + //yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml") + yamlFile, err := ioutil.ReadFile("server-env.yaml") + + if err != nil { + panic(fmt.Errorf("failed to read YAML file: %v", err)) + } + + err = yaml.Unmarshal(yamlFile, &agentServerInfo) + if err != nil { + panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) + } + + jsonFormat, err := json.Marshal(agentServerInfo) + if err != nil { + log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) + panic(err) + } + log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) + + return agentServerInfo +} diff --git a/agent-go/register/AgentIntitilization.go b/agent-go/register/AgentIntitilization.go deleted file mode 100644 index d833585..0000000 --- a/agent-go/register/AgentIntitilization.go +++ /dev/null @@ -1,175 +0,0 @@ -package register - -import ( - "agent-go/config" - "agent-go/g" - "agent-go/rabbitmq" - "encoding/json" - "fmt" - "gopkg.in/yaml.v3" - "io/ioutil" -) - -var omType = g.InitOmType -var log = g.G.LOG - -func INIT() *config.AgentServerInfo { - - // 获取系统的环境变量 - agentServerInfo := parseAgentServerInfo() - - nacosConfig := g.G.NacosConfig - - initToServerProp := &rabbitmq.ConnectProperty{ - ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), - QueueName: nacosConfig.GetString("octopus.message.init_to_server"), - ExchangeType: g.QueueDirect, - TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"), - } - - initFromServerProp := &rabbitmq.ConnectProperty{ - ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), - QueueName: nacosConfig.GetString("octopus.message.init_from_server"), - ExchangeType: g.QueueDirect, - TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"), - } - - // 建立RabbitMQ的连接 - // defer 关闭初始化连接 - initToServer, err := rabbitmq.NewRabbitMQConn( - initToServerProp, - ) - if err != nil { - log.Error("init to server queue established error!") - panic(err) - } - //defer rabbitmq.CloseChannel(initToServer) - //defer rabbitmq.CloseChannel(initFromServer) - - // 组装OctopusMessage - var octopusMsg *config.OctopusMessage - octopusMsg = octopusMsg.BuildOctopusMsg( - omType, - agentServerInfo, - ) - - msgBytes, err := json.Marshal(octopusMsg) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) - } - - log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes))) - - // 发送OM至MQ中 - rabbitmq.Send( - initToServer, - initToServerProp, - msgBytes, - ) - - // 监听初始化连接中的信息 - initFromServer, err := rabbitmq.NewRabbitMQConn( - initFromServerProp, - ) - if err != nil { - log.Error("init from server queue established error!") - panic(err) - } - - // 建立运行时RabbitMQ连接 - handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp, agentServerInfo) - - return agentServerInfo -} - -// handleInitMsgFromServer 处理从Server接收的注册信息 -func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty, agentServerInfo *config.AgentServerInfo) { - - deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false) - - forever := make(chan bool) - - go func() { - - // 同步很多抢占注册的情况 - for delivery := range deliveries { - - log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) - - var om *config.OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", - string(delivery.Body))) - } - - var serverInfo config.AgentServerInfo - - s, _ := om.Content.(string) - cc := json.Unmarshal([]byte(s), &serverInfo) - if cc != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) - } - - serverName := serverInfo.ServerName - - // 处理OM信息 - if om != nil && om.Type == g.InitOmType && serverName == agentServerInfo.ServerName { - // 是本机的注册回复信息 - - // 建立运行时RabbitMQ连接 - // change to async - go rabbitmq.HandleOMsg(om) - - // 手动确认信息 - delivery.Ack(false) - - // 手动关闭 注册队列的连接 - shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp) - - return - } - - // 不是自身的 注册回复信息 -- 拒绝 - log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", om, delivery.Body)) - delivery.Nack(false, true) - } - - }() - - // wait forever - <-forever - -} - -// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 -func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { - -} - -func parseAgentServerInfo() *config.AgentServerInfo { - - // 约定文件地址为 /etc/environment.d/octopus-agent.conf - // 目前使用 - var agentServerInfo *config.AgentServerInfo - //yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml") - yamlFile, err := ioutil.ReadFile("server-env.yaml") - - if err != nil { - panic(fmt.Errorf("failed to read YAML file: %v", err)) - } - - err = yaml.Unmarshal(yamlFile, &agentServerInfo) - if err != nil { - panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) - } - - jsonFormat, err := json.Marshal(agentServerInfo) - if err != nil { - log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) - panic(err) - } - log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) - - return agentServerInfo -} diff --git a/agent-go/config/AgentServerInfo.go b/agent-go/register/AgentServerInfo.go similarity index 98% rename from agent-go/config/AgentServerInfo.go rename to agent-go/register/AgentServerInfo.go index ad20396..3026134 100644 --- a/agent-go/config/AgentServerInfo.go +++ b/agent-go/register/AgentServerInfo.go @@ -1,4 +1,4 @@ -package config +package register type AgentServerInfo struct { ServerName string `json:"serverName" yaml:"serverName"` diff --git a/agent-go/g/NacosConfig.go b/agent-go/register/NacosInitalization.go similarity index 99% rename from agent-go/g/NacosConfig.go rename to agent-go/register/NacosInitalization.go index 452cf5e..455ebf1 100644 --- a/agent-go/g/NacosConfig.go +++ b/agent-go/register/NacosInitalization.go @@ -1,4 +1,4 @@ -package g +package register import ( "bytes" @@ -13,7 +13,6 @@ import ( "strings" ) -var log = G.LOG var group = "" func InitNacos(configFileName string) *viper.Viper {