From 6025620eea4f34df0ac2fc2d0d1ee6516c1a6faa Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 16 Jun 2023 14:22:58 +0800 Subject: [PATCH] [ Agent ] remove nacos in agent --- agent-go/g/global.go | 4 +- agent-go/main.go | 2 +- agent-go/octopus-agent-dev.yaml | 110 ++++++-- agent-go/rabbitmq/OMsgConnector.go | 6 +- agent-go/rabbitmq/RabbitMsgQueue.go | 14 +- agent-go/register/AgentInitialization.go | 2 +- agent-go/register/ConfigParser.go | 34 +++ agent-go/register/NacosInitalization.go | 339 ++++++++++++----------- 8 files changed, 307 insertions(+), 204 deletions(-) create mode 100644 agent-go/register/ConfigParser.go diff --git a/agent-go/g/global.go b/agent-go/g/global.go index 80c65eb..4c35d06 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -8,7 +8,7 @@ import ( type Global struct { AgentHasRegister bool - NacosConfig *viper.Viper + AgentConfig *viper.Viper P *ants.Pool } @@ -31,7 +31,7 @@ var G = NewGlobal( func NewGlobal(pool *ants.Pool) *Global { return &Global{ AgentHasRegister: false, - NacosConfig: nil, + AgentConfig: nil, P: pool, } } diff --git a/agent-go/main.go b/agent-go/main.go index 8ed778d..544b72b 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -21,7 +21,7 @@ func main() { println(filename) // 初始化Nacos的连接配置 - g.G.NacosConfig = register.InitNacos(filename) + g.G.AgentConfig = register.ParseConfiguration(filename) // 执行初始化之策工作 register.AgentServerInfoCache = register.INIT() diff --git a/agent-go/octopus-agent-dev.yaml b/agent-go/octopus-agent-dev.yaml index ac474f7..5179bdf 100644 --- a/agent-go/octopus-agent-dev.yaml +++ b/agent-go/octopus-agent-dev.yaml @@ -1,22 +1,90 @@ -spring: - application: - name: octopus-agent - profiles: - active: dev - cloud: - nacos: - config: - group: dev - config-retry-time: 3000 - file-extension: yaml - max-retry: 3 - # server-addr: "150.230.198.103:21060" - server-addr: "42.192.52.227:21060" - timeout: 5000 - config-long-poll-timeout: 5000 - extension-configs: - - group: dev - data-id: "common-dev.yaml" - server: - port: 8000 \ No newline at end of file + port: 8000 + +logging: + level: + web: info + +octopus: + message: + # agent boot up default common exchange + init_exchange: InitExchange + # server will send message to agent using this common queue + init_to_server: InitToServer + # agent boot up default common exchange routing key + init_to_server_key: InitToServerKey + # server will receive message from agent using this common queue + init_from_server: InitFromServer + # agent boot up default common exchange routing key + init_from_server_key: InitFromServerKey + # initialization register time out (unit ms) default is 5 min + init_ttl: "3000000" + # Octopus Exchange Name == server comunicate with agent + octopus_exchange: OctopusExchange + # Octopus Message To Server == all agent send info to server queue and topic + octopus_to_server: OctopusToServer + executor: + name: executor-functions + status: + name: octopus-agent + healthy: + type: cron + cron: 10 */1 * * * ? * + start-delay: 30 + metric: + pinch: 20 + agent: + executor: + # agent执行一条Command的最长超时时间 + processMaxTimeOut: 60 + status: + app: + - Nginx/nginx + - MySQL/mysql + - Xray/xray + - OctopusAgent/octopus-agent + - Redis/redis + - RabbitMQ/rabbitmq + +spring: + main: + allow-circular-references: true + allow-bean-definition-overriding: true + rabbitmq: + host: 42.192.52.227 + port: 20672 + username: boge + password: boge8tingH + virtual-host: / + listener: + simple: + retry: + # ack failed will reentrant the Rabbit Listener + max-attempts: 2 + enabled: true + # retry interval unit ms + max-interval: 65000 + initial-interval: 65000 + +#spring: +# application: +# name: octopus-agent +# profiles: +# active: dev +# cloud: +# nacos: +# config: +# group: dev +# config-retry-time: 3000 +# file-extension: yaml +# max-retry: 3 +# # server-addr: "150.230.198.103:21060" +# server-addr: "42.192.52.227:21060" +# timeout: 5000 +# config-long-poll-timeout: 5000 +# extension-configs: +# - group: dev +# data-id: "common-dev.yaml" +# +#server: +# port: 8000 \ No newline at end of file diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go index 5d28d25..d02bc2b 100644 --- a/agent-go/rabbitmq/OMsgConnector.go +++ b/agent-go/rabbitmq/OMsgConnector.go @@ -12,9 +12,9 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { // 建立 业务消息 接收队列 // agentTopicName为名称的队列 - nacosConfig := g.G.NacosConfig + agentConfig := g.G.AgentConfig - octopusExchangeName := nacosConfig.GetString("octopus.message.octopus_exchange") + octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange") octopusConnectProp := &ConnectProperty{ ExchangeName: octopusExchangeName, @@ -31,7 +31,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { // 建立 业务消息 返回队列 // 统一为 OctopusToServer - octopusToServerQueueName := nacosConfig.GetString("octopus.message.octopus_to_server") + octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server") octopusToServerProp := &ConnectProperty{ ExchangeName: octopusExchangeName, diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index 0193f84..8edff5f 100644 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -181,18 +181,18 @@ func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { return msgs } -// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址 +// parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址 func parseRabbitMQEndpointFromG() string { - nacosConfig := g.G.NacosConfig + agentConfig := g.G.AgentConfig var res strings.Builder - host := nacosConfig.GetString("spring.rabbitmq.host") - port := nacosConfig.GetString("spring.rabbitmq.port") - username := nacosConfig.GetString("spring.rabbitmq.username") - password := nacosConfig.GetString("spring.rabbitmq.password") - virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host") + host := agentConfig.GetString("spring.rabbitmq.host") + port := agentConfig.GetString("spring.rabbitmq.port") + username := agentConfig.GetString("spring.rabbitmq.username") + password := agentConfig.GetString("spring.rabbitmq.password") + virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} res.WriteString("amqp://") diff --git a/agent-go/register/AgentInitialization.go b/agent-go/register/AgentInitialization.go index c762d0c..62e72bf 100644 --- a/agent-go/register/AgentInitialization.go +++ b/agent-go/register/AgentInitialization.go @@ -22,7 +22,7 @@ func INIT() *AgentServerInfo { // 获取系统的环境变量 agentServerInfo := parseAgentServerInfo() - nacosConfig := g.G.NacosConfig + nacosConfig := g.G.AgentConfig initToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), diff --git a/agent-go/register/ConfigParser.go b/agent-go/register/ConfigParser.go new file mode 100644 index 0000000..a876040 --- /dev/null +++ b/agent-go/register/ConfigParser.go @@ -0,0 +1,34 @@ +package register + +import ( + "fmt" + "github.com/spf13/viper" +) + +func ParseConfiguration(configFileName string) *viper.Viper { + + agentConfig := parseAgentConfigFile(configFileName, nil) + + return agentConfig +} + +func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { + + // 使用Viper框架读取 + if v == nil { + v = viper.New() + } + + // 设置配置文件路径和名称 + v.SetConfigName(configFileName) + v.AddConfigPath(".") + v.SetConfigType("yaml") + + // 读取默认的总配置文件 + err := v.ReadInConfig() + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + return v +} diff --git a/agent-go/register/NacosInitalization.go b/agent-go/register/NacosInitalization.go index 455ebf1..2aa11b3 100644 --- a/agent-go/register/NacosInitalization.go +++ b/agent-go/register/NacosInitalization.go @@ -1,171 +1,172 @@ package register -import ( - "bytes" - "fmt" - "github.com/nacos-group/nacos-sdk-go/v2/clients" - "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" - "github.com/nacos-group/nacos-sdk-go/v2/common/constant" - "github.com/nacos-group/nacos-sdk-go/v2/vo" - "github.com/spf13/viper" - "go.uber.org/zap" - "strconv" - "strings" -) - -var group = "" - -func InitNacos(configFileName string) *viper.Viper { - - v := parseAgentConfigFile(configFileName, nil) - group = v.GetString("spring.cloud.nacos.config.group") - - // build the nacos connection - configClient := startNacosConnection(v) - - // get all needed nacos config and merge - allNacosConfig := getAllNacosConfig(v, group, configClient) - - for _, nacosConfigContent := range allNacosConfig { - log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent)) - - parseNacosConfigContend(nacosConfigContent, v) - } - - log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings())) - - return v -} - -func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { - - // 使用Viper框架读取 - if v == nil { - v = viper.New() - } - - // 设置配置文件路径和名称 - v.SetConfigName(configFileName) - v.AddConfigPath(".") - v.SetConfigType("yaml") - - // 读取默认的总配置文件 - err := v.ReadInConfig() - if err != nil { - panic(fmt.Errorf("fatal error config file: %s", err)) - } - - return v -} - -func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper { - - v.SetConfigType("yaml") - - // use merge - - err := v.MergeConfig(bytes.NewBuffer([]byte(configContent))) - if err != nil { - log.Error("nacos config contend read error !", zap.Error(err)) - } - - return v -} -func startNacosConnection(v *viper.Viper) config_client.IConfigClient { - - serverAddr := v.GetString("spring.cloud.nacos.config.server-addr") - - clientConfig := constant.ClientConfig{ - //Endpoint: serverAddr, - NamespaceId: "", - TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"), - NotLoadCacheAtStart: true, - AppendToStdout: true, - UpdateCacheWhenEmpty: true, - //LogDir: "/tmp/nacos/log", - //CacheDir: "/tmp/nacos/cache", - Username: "nacos", - Password: "Superwmm.23", - } - - split := strings.Split(serverAddr, ":") - if len(split) != 2 { - log.Error("nacos server addr error!") - } - - port, _ := strconv.ParseUint(split[1], 10, 64) - serverConfigs := []constant.ServerConfig{ - { - IpAddr: split[0], - Port: port, - GrpcPort: port + 1000, - }, - } - - // Another way of create config client for dynamic configuration (recommend) - configClient, err := clients.NewConfigClient( - vo.NacosClientParam{ - ClientConfig: &clientConfig, - ServerConfigs: serverConfigs, - }, - ) - if err != nil { - panic(err) - } - - return configClient -} - -func getAllNacosConfig(v *viper.Viper, group string, configClient config_client.IConfigClient) []string { - - result := make([]string, 0) - - // main nacos configs - mainNacosConfigFileName := v.GetString("spring.application.name") + "-" + v.GetString("spring.profiles.active") + "." + v.GetString("spring.cloud.nacos.config.file-extension") - - log.Debug(fmt.Sprintf("main nacos config file name is %s", mainNacosConfigFileName)) - configContent := getConfig(mainNacosConfigFileName, group, configClient) - result = append(result, configContent) - - // additional nacos config - additionalNacosConfig := v.Get("spring.cloud.nacos.config.extension-configs") - // 增加断言,判定map的类型 - m, ok := additionalNacosConfig.([]interface{}) - if !ok { - fmt.Println("additionalNacosConfig is not a slice") - return nil - } - - for _, addConfigMap := range m { - - realMap, _ := addConfigMap.(map[string]interface{}) - - // 拿到配置的Key - dataId := realMap["data-id"].(string) - group := realMap["group"].(string) - - // 查询 - config := getConfig(dataId, group, configClient) - result = append(result, config) - } - - return result -} - -// getConfig 从Nacos中获取相应的 -func getConfig(dataId string, group string, configClient config_client.IConfigClient) string { - - log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group)) - - content, err := configClient.GetConfig(vo.ConfigParam{ - DataId: dataId, - Group: group, - }) - if err != nil { - log.Error("nacos config get error !", zap.Error(err)) - } - - log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content)) - - return content -} +// +//import ( +// "bytes" +// "fmt" +// "github.com/nacos-group/nacos-sdk-go/v2/clients" +// "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" +// "github.com/nacos-group/nacos-sdk-go/v2/common/constant" +// "github.com/nacos-group/nacos-sdk-go/v2/vo" +// "github.com/spf13/viper" +// "go.uber.org/zap" +// "strconv" +// "strings" +//) +// +//var group = "" +// +//func InitNacos(configFileName string) *viper.Viper { +// +// v := parseAgentConfigFile(configFileName, nil) +// group = v.GetString("spring.cloud.nacos.config.group") +// +// // build the nacos connection +// configClient := startNacosConnection(v) +// +// // get all needed nacos config and merge +// allNacosConfig := getAllNacosConfig(v, group, configClient) +// +// for _, nacosConfigContent := range allNacosConfig { +// log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent)) +// +// parseNacosConfigContend(nacosConfigContent, v) +// } +// +// log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings())) +// +// return v +//} +// +//func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { +// +// // 使用Viper框架读取 +// if v == nil { +// v = viper.New() +// } +// +// // 设置配置文件路径和名称 +// v.SetConfigName(configFileName) +// v.AddConfigPath(".") +// v.SetConfigType("yaml") +// +// // 读取默认的总配置文件 +// err := v.ReadInConfig() +// if err != nil { +// panic(fmt.Errorf("fatal error config file: %s", err)) +// } +// +// return v +//} +// +//func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper { +// +// v.SetConfigType("yaml") +// +// // use merge +// +// err := v.MergeConfig(bytes.NewBuffer([]byte(configContent))) +// if err != nil { +// log.Error("nacos config contend read error !", zap.Error(err)) +// } +// +// return v +//} +//func startNacosConnection(v *viper.Viper) config_client.IConfigClient { +// +// serverAddr := v.GetString("spring.cloud.nacos.config.server-addr") +// +// clientConfig := constant.ClientConfig{ +// //Endpoint: serverAddr, +// NamespaceId: "", +// TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"), +// NotLoadCacheAtStart: true, +// AppendToStdout: true, +// UpdateCacheWhenEmpty: true, +// //LogDir: "/tmp/nacos/log", +// //CacheDir: "/tmp/nacos/cache", +// Username: "nacos", +// Password: "Superwmm.23", +// } +// +// split := strings.Split(serverAddr, ":") +// if len(split) != 2 { +// log.Error("nacos server addr error!") +// } +// +// port, _ := strconv.ParseUint(split[1], 10, 64) +// serverConfigs := []constant.ServerConfig{ +// { +// IpAddr: split[0], +// Port: port, +// GrpcPort: port + 1000, +// }, +// } +// +// // Another way of create config client for dynamic configuration (recommend) +// configClient, err := clients.NewConfigClient( +// vo.NacosClientParam{ +// ClientConfig: &clientConfig, +// ServerConfigs: serverConfigs, +// }, +// ) +// if err != nil { +// panic(err) +// } +// +// return configClient +//} +// +//func getAllNacosConfig(v *viper.Viper, group string, configClient config_client.IConfigClient) []string { +// +// result := make([]string, 0) +// +// // main nacos configs +// mainNacosConfigFileName := v.GetString("spring.application.name") + "-" + v.GetString("spring.profiles.active") + "." + v.GetString("spring.cloud.nacos.config.file-extension") +// +// log.Debug(fmt.Sprintf("main nacos config file name is %s", mainNacosConfigFileName)) +// configContent := getConfig(mainNacosConfigFileName, group, configClient) +// result = append(result, configContent) +// +// // additional nacos config +// additionalNacosConfig := v.Get("spring.cloud.nacos.config.extension-configs") +// // 增加断言,判定map的类型 +// m, ok := additionalNacosConfig.([]interface{}) +// if !ok { +// fmt.Println("additionalNacosConfig is not a slice") +// return nil +// } +// +// for _, addConfigMap := range m { +// +// realMap, _ := addConfigMap.(map[string]interface{}) +// +// // 拿到配置的Key +// dataId := realMap["data-id"].(string) +// group := realMap["group"].(string) +// +// // 查询 +// config := getConfig(dataId, group, configClient) +// result = append(result, config) +// } +// +// return result +//} +// +//// getConfig 从Nacos中获取相应的 +//func getConfig(dataId string, group string, configClient config_client.IConfigClient) string { +// +// log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group)) +// +// content, err := configClient.GetConfig(vo.ConfigParam{ +// DataId: dataId, +// Group: group, +// }) +// if err != nil { +// log.Error("nacos config get error !", zap.Error(err)) +// } +// +// log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content)) +// +// return content +//}