From 5accd999f8ad64c82312f1af914c18090ed32b33 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 24 Mar 2023 17:02:28 +0800 Subject: [PATCH] =?UTF-8?q?[agent-go]=20=E8=A7=A3=E5=86=B3=E4=BB=A3?= =?UTF-8?q?=E7=A0=81bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/g/NacosConfig.go | 52 ++++++++++++++---------- agent-go/g/global.go | 2 - agent-go/go.mod | 2 +- agent-go/main.go | 2 +- agent-go/octopus-agent-dev.yaml | 1 + agent-go/rabbitmq/MessageReaderWriter.go | 16 ++++---- agent-go/rabbitmq/RabbitMQConnector.go | 1 + agent-go/register/AgentIntitilization.go | 5 +-- 8 files changed, 44 insertions(+), 37 deletions(-) diff --git a/agent-go/g/NacosConfig.go b/agent-go/g/NacosConfig.go index 9573789..452cf5e 100644 --- a/agent-go/g/NacosConfig.go +++ b/agent-go/g/NacosConfig.go @@ -16,23 +16,26 @@ import ( var log = G.LOG var group = "" -func InitNacos(configFileName string) { +func InitNacos(configFileName string) *viper.Viper { v := parseAgentConfigFile(configFileName, nil) group = v.GetString("spring.cloud.nacos.config.group") + // build the nacos connection configClient := startNacosConnection(v) + // get all needed nacos config and merge allNacosConfig := getAllNacosConfig(v, group, configClient) for _, nacosConfigContent := range allNacosConfig { log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent)) - //parseNacosConfigContend(nacosConfigContent, v) + parseNacosConfigContend(nacosConfigContent, v) } log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings())) + return v } func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { @@ -59,7 +62,10 @@ func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper { v.SetConfigType("yaml") - err := v.ReadConfig(bytes.NewBuffer([]byte(configContent))) + + // use merge + + err := v.MergeConfig(bytes.NewBuffer([]byte(configContent))) if err != nil { log.Error("nacos config contend read error !", zap.Error(err)) } @@ -68,19 +74,21 @@ func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper } func startNacosConnection(v *viper.Viper) config_client.IConfigClient { + serverAddr := v.GetString("spring.cloud.nacos.config.server-addr") + clientConfig := constant.ClientConfig{ - NamespaceId: "public", - TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"), - NotLoadCacheAtStart: true, - AppendToStdout: true, - //UpdateCacheWhenEmpty: true, + //Endpoint: serverAddr, + NamespaceId: "", + TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"), + NotLoadCacheAtStart: true, + AppendToStdout: true, + UpdateCacheWhenEmpty: true, //LogDir: "/tmp/nacos/log", //CacheDir: "/tmp/nacos/cache", Username: "nacos", Password: "Superwmm.23", } - serverAddr := v.GetString("spring.cloud.nacos.config.server-addr") split := strings.Split(serverAddr, ":") if len(split) != 2 { log.Error("nacos server addr error!") @@ -131,15 +139,15 @@ func getAllNacosConfig(v *viper.Viper, group string, configClient config_client. for _, addConfigMap := range m { - real, _ := addConfigMap.(map[string]interface{}) + realMap, _ := addConfigMap.(map[string]interface{}) - for additionalNacosConfigFileName, additionalNacosConfigGroup := range real { - - s := additionalNacosConfigGroup.(string) - configContent := getConfig(additionalNacosConfigFileName, s, configClient) - result = append(result, configContent) - } + // 拿到配置的Key + dataId := realMap["data-id"].(string) + group := realMap["group"].(string) + // 查询 + config := getConfig(dataId, group, configClient) + result = append(result, config) } return result @@ -150,13 +158,13 @@ func getConfig(dataId string, group string, configClient config_client.IConfigCl log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group)) - content := "" - configClient.GetConfig(vo.ConfigParam{ - DataId: dataId, - Group: group, - Content: content, - Type: "yaml", + content, err := configClient.GetConfig(vo.ConfigParam{ + DataId: dataId, + Group: group, }) + if err != nil { + log.Error("nacos config get error !", zap.Error(err)) + } log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content)) diff --git a/agent-go/g/global.go b/agent-go/g/global.go index 9980668..751b35c 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -17,8 +17,6 @@ const ( ExecOmType = "EXECUTOR" StatusOmType = "STATUS" InitOmType = "INIT" - - // write about ) var logger, _ = NewLogger() diff --git a/agent-go/go.mod b/agent-go/go.mod index 93eaf60..e7318f6 100644 --- a/agent-go/go.mod +++ b/agent-go/go.mod @@ -7,6 +7,7 @@ require ( github.com/spf13/viper v1.15.0 github.com/streadway/amqp v1.0.0 go.uber.org/zap v1.24.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -48,5 +49,4 @@ require ( google.golang.org/protobuf v1.28.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/agent-go/main.go b/agent-go/main.go index 38fe12c..4b61e8b 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -20,7 +20,7 @@ func main() { println(filename) // 初始化Nacos的连接配置 - //config.InitNacos(filename) + g.G.NacosConfig = g.InitNacos(filename) // 执行初始化之策工作 g.G.AgentServerInfo = register.INIT() diff --git a/agent-go/octopus-agent-dev.yaml b/agent-go/octopus-agent-dev.yaml index 00f9398..87c7d61 100644 --- a/agent-go/octopus-agent-dev.yaml +++ b/agent-go/octopus-agent-dev.yaml @@ -11,6 +11,7 @@ spring: file-extension: yaml max-retry: 3 server-addr: "150.230.198.103:21060" +# server-addr: "42.192.52.227:21060" timeout: 5000 config-long-poll-timeout: 5000 extension-configs: diff --git a/agent-go/rabbitmq/MessageReaderWriter.go b/agent-go/rabbitmq/MessageReaderWriter.go index 9ab0c2a..ffb027b 100644 --- a/agent-go/rabbitmq/MessageReaderWriter.go +++ b/agent-go/rabbitmq/MessageReaderWriter.go @@ -1,7 +1,6 @@ package rabbitmq import ( - "agent-go/g" "fmt" "github.com/streadway/amqp" ) @@ -23,6 +22,7 @@ type ConnectProperty struct { func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { // 往哪里发 channel := conn.Channel + // 发送 channel.Publish( connProp.ExchangeName, @@ -43,13 +43,13 @@ func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan am // 开始读取队列中的全部消息 msgs, err := channel.Consume( - connProp.QueueName, // 队列名称 - g.G.AgentServerInfo.AgentTopicName, // 消费者名称 - autoAck, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // arguments + connProp.QueueName, // 队列名称 + "", // 消费者名称 + autoAck, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // arguments ) if err != nil { log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) diff --git a/agent-go/rabbitmq/RabbitMQConnector.go b/agent-go/rabbitmq/RabbitMQConnector.go index afda22c..c0088f3 100644 --- a/agent-go/rabbitmq/RabbitMQConnector.go +++ b/agent-go/rabbitmq/RabbitMQConnector.go @@ -109,6 +109,7 @@ func parseRabbitMQEndpointFromG() string { res.WriteString(host) res.WriteString(":") res.WriteString(port) + res.WriteString("/") res.WriteString(virtualHost) s := res.String() diff --git a/agent-go/register/AgentIntitilization.go b/agent-go/register/AgentIntitilization.go index badd91f..56e71ef 100644 --- a/agent-go/register/AgentIntitilization.go +++ b/agent-go/register/AgentIntitilization.go @@ -44,7 +44,7 @@ func INIT() *config.AgentServerInfo { log.Error("init to server queue established error!") panic(err) } - defer rabbitmq.CloseChannel(initToServer) + //defer rabbitmq.CloseChannel(initToServer) initFromServer, err := rabbitmq.NewRabbitMQConn( initFromServerProp, @@ -53,7 +53,7 @@ func INIT() *config.AgentServerInfo { log.Error("init from server queue established error!") panic(err) } - defer rabbitmq.CloseChannel(initFromServer) + //defer rabbitmq.CloseChannel(initFromServer) // 组装OctopusMessage var octopusMsg *config.OctopusMessage @@ -77,7 +77,6 @@ func INIT() *config.AgentServerInfo { handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) return agentServerInfo - } // handleInitMsgFromServer 处理从Server接收的注册信息