[ Agent ] remove nacos in agent

This commit is contained in:
zeaslity
2023-06-16 14:22:58 +08:00
parent 93692eb0af
commit 6025620eea
8 changed files with 307 additions and 204 deletions

View File

@@ -8,7 +8,7 @@ import (
type Global struct { type Global struct {
AgentHasRegister bool AgentHasRegister bool
NacosConfig *viper.Viper AgentConfig *viper.Viper
P *ants.Pool P *ants.Pool
} }
@@ -31,7 +31,7 @@ var G = NewGlobal(
func NewGlobal(pool *ants.Pool) *Global { func NewGlobal(pool *ants.Pool) *Global {
return &Global{ return &Global{
AgentHasRegister: false, AgentHasRegister: false,
NacosConfig: nil, AgentConfig: nil,
P: pool, P: pool,
} }
} }

View File

@@ -21,7 +21,7 @@ func main() {
println(filename) println(filename)
// 初始化Nacos的连接配置 // 初始化Nacos的连接配置
g.G.NacosConfig = register.InitNacos(filename) g.G.AgentConfig = register.ParseConfiguration(filename)
// 执行初始化之策工作 // 执行初始化之策工作
register.AgentServerInfoCache = register.INIT() register.AgentServerInfoCache = register.INIT()

View File

@@ -1,22 +1,90 @@
spring:
application:
name: octopus-agent
profiles:
active: dev
cloud:
nacos:
config:
group: dev
config-retry-time: 3000
file-extension: yaml
max-retry: 3
# server-addr: "150.230.198.103:21060"
server-addr: "42.192.52.227:21060"
timeout: 5000
config-long-poll-timeout: 5000
extension-configs:
- group: dev
data-id: "common-dev.yaml"
server: server:
port: 8000 port: 8000
logging:
level:
web: info
octopus:
message:
# agent boot up default common exchange
init_exchange: InitExchange
# server will send message to agent using this common queue
init_to_server: InitToServer
# agent boot up default common exchange routing key
init_to_server_key: InitToServerKey
# server will receive message from agent using this common queue
init_from_server: InitFromServer
# agent boot up default common exchange routing key
init_from_server_key: InitFromServerKey
# initialization register time out (unit ms) default is 5 min
init_ttl: "3000000"
# Octopus Exchange Name == server comunicate with agent
octopus_exchange: OctopusExchange
# Octopus Message To Server == all agent send info to server queue and topic
octopus_to_server: OctopusToServer
executor:
name: executor-functions
status:
name: octopus-agent
healthy:
type: cron
cron: 10 */1 * * * ? *
start-delay: 30
metric:
pinch: 20
agent:
executor:
# agent执行一条Command的最长超时时间
processMaxTimeOut: 60
status:
app:
- Nginx/nginx
- MySQL/mysql
- Xray/xray
- OctopusAgent/octopus-agent
- Redis/redis
- RabbitMQ/rabbitmq
spring:
main:
allow-circular-references: true
allow-bean-definition-overriding: true
rabbitmq:
host: 42.192.52.227
port: 20672
username: boge
password: boge8tingH
virtual-host: /
listener:
simple:
retry:
# ack failed will reentrant the Rabbit Listener
max-attempts: 2
enabled: true
# retry interval unit ms
max-interval: 65000
initial-interval: 65000
#spring:
# application:
# name: octopus-agent
# profiles:
# active: dev
# cloud:
# nacos:
# config:
# group: dev
# config-retry-time: 3000
# file-extension: yaml
# max-retry: 3
# # server-addr: "150.230.198.103:21060"
# server-addr: "42.192.52.227:21060"
# timeout: 5000
# config-long-poll-timeout: 5000
# extension-configs:
# - group: dev
# data-id: "common-dev.yaml"
#
#server:
# port: 8000

View File

@@ -12,9 +12,9 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
// 建立 业务消息 接收队列 // 建立 业务消息 接收队列
// agentTopicName为名称的队列 // agentTopicName为名称的队列
nacosConfig := g.G.NacosConfig agentConfig := g.G.AgentConfig
octopusExchangeName := nacosConfig.GetString("octopus.message.octopus_exchange") octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{ octopusConnectProp := &ConnectProperty{
ExchangeName: octopusExchangeName, ExchangeName: octopusExchangeName,
@@ -31,7 +31,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
// 建立 业务消息 返回队列 // 建立 业务消息 返回队列
// 统一为 OctopusToServer // 统一为 OctopusToServer
octopusToServerQueueName := nacosConfig.GetString("octopus.message.octopus_to_server") octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
octopusToServerProp := &ConnectProperty{ octopusToServerProp := &ConnectProperty{
ExchangeName: octopusExchangeName, ExchangeName: octopusExchangeName,

View File

@@ -181,18 +181,18 @@ func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
return msgs return msgs
} }
// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址 // parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string { func parseRabbitMQEndpointFromG() string {
nacosConfig := g.G.NacosConfig agentConfig := g.G.AgentConfig
var res strings.Builder var res strings.Builder
host := nacosConfig.GetString("spring.rabbitmq.host") host := agentConfig.GetString("spring.rabbitmq.host")
port := nacosConfig.GetString("spring.rabbitmq.port") port := agentConfig.GetString("spring.rabbitmq.port")
username := nacosConfig.GetString("spring.rabbitmq.username") username := agentConfig.GetString("spring.rabbitmq.username")
password := nacosConfig.GetString("spring.rabbitmq.password") password := agentConfig.GetString("spring.rabbitmq.password")
virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host") virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host} // amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://") res.WriteString("amqp://")

View File

@@ -22,7 +22,7 @@ func INIT() *AgentServerInfo {
// 获取系统的环境变量 // 获取系统的环境变量
agentServerInfo := parseAgentServerInfo() agentServerInfo := parseAgentServerInfo()
nacosConfig := g.G.NacosConfig nacosConfig := g.G.AgentConfig
initToServerProp := &rabbitmq.ConnectProperty{ initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),

View File

@@ -0,0 +1,34 @@
package register
import (
"fmt"
"github.com/spf13/viper"
)
func ParseConfiguration(configFileName string) *viper.Viper {
agentConfig := parseAgentConfigFile(configFileName, nil)
return agentConfig
}
func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper {
// 使用Viper框架读取
if v == nil {
v = viper.New()
}
// 设置配置文件路径和名称
v.SetConfigName(configFileName)
v.AddConfigPath(".")
v.SetConfigType("yaml")
// 读取默认的总配置文件
err := v.ReadInConfig()
if err != nil {
panic(fmt.Errorf("fatal error config file: %s", err))
}
return v
}

View File

@@ -1,171 +1,172 @@
package register package register
import ( //
"bytes" //import (
"fmt" // "bytes"
"github.com/nacos-group/nacos-sdk-go/v2/clients" // "fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" // "github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant" // "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/vo" // "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/spf13/viper" // "github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/zap" // "github.com/spf13/viper"
"strconv" // "go.uber.org/zap"
"strings" // "strconv"
) // "strings"
//)
var group = "" //
//var group = ""
func InitNacos(configFileName string) *viper.Viper { //
//func InitNacos(configFileName string) *viper.Viper {
v := parseAgentConfigFile(configFileName, nil) //
group = v.GetString("spring.cloud.nacos.config.group") // v := parseAgentConfigFile(configFileName, nil)
// group = v.GetString("spring.cloud.nacos.config.group")
// build the nacos connection //
configClient := startNacosConnection(v) // // build the nacos connection
// configClient := startNacosConnection(v)
// get all needed nacos config and merge //
allNacosConfig := getAllNacosConfig(v, group, configClient) // // get all needed nacos config and merge
// allNacosConfig := getAllNacosConfig(v, group, configClient)
for _, nacosConfigContent := range allNacosConfig { //
log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent)) // for _, nacosConfigContent := range allNacosConfig {
// log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent))
parseNacosConfigContend(nacosConfigContent, v) //
} // parseNacosConfigContend(nacosConfigContent, v)
// }
log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings())) //
// log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings()))
return v //
} // return v
//}
func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper { //
//func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper {
// 使用Viper框架读取 //
if v == nil { // // 使用Viper框架读取
v = viper.New() // if v == nil {
} // v = viper.New()
// }
// 设置配置文件路径和名称 //
v.SetConfigName(configFileName) // // 设置配置文件路径和名称
v.AddConfigPath(".") // v.SetConfigName(configFileName)
v.SetConfigType("yaml") // v.AddConfigPath(".")
// v.SetConfigType("yaml")
// 读取默认的总配置文件 //
err := v.ReadInConfig() // // 读取默认的总配置文件
if err != nil { // err := v.ReadInConfig()
panic(fmt.Errorf("fatal error config file: %s", err)) // if err != nil {
} // panic(fmt.Errorf("fatal error config file: %s", err))
// }
return v //
} // return v
//}
func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper { //
//func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper {
v.SetConfigType("yaml") //
// v.SetConfigType("yaml")
// use merge //
// // use merge
err := v.MergeConfig(bytes.NewBuffer([]byte(configContent))) //
if err != nil { // err := v.MergeConfig(bytes.NewBuffer([]byte(configContent)))
log.Error("nacos config contend read error !", zap.Error(err)) // if err != nil {
} // log.Error("nacos config contend read error !", zap.Error(err))
// }
return v //
} // return v
func startNacosConnection(v *viper.Viper) config_client.IConfigClient { //}
//func startNacosConnection(v *viper.Viper) config_client.IConfigClient {
serverAddr := v.GetString("spring.cloud.nacos.config.server-addr") //
// serverAddr := v.GetString("spring.cloud.nacos.config.server-addr")
clientConfig := constant.ClientConfig{ //
//Endpoint: serverAddr, // clientConfig := constant.ClientConfig{
NamespaceId: "", // //Endpoint: serverAddr,
TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"), // NamespaceId: "",
NotLoadCacheAtStart: true, // TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"),
AppendToStdout: true, // NotLoadCacheAtStart: true,
UpdateCacheWhenEmpty: true, // AppendToStdout: true,
//LogDir: "/tmp/nacos/log", // UpdateCacheWhenEmpty: true,
//CacheDir: "/tmp/nacos/cache", // //LogDir: "/tmp/nacos/log",
Username: "nacos", // //CacheDir: "/tmp/nacos/cache",
Password: "Superwmm.23", // Username: "nacos",
} // Password: "Superwmm.23",
// }
split := strings.Split(serverAddr, ":") //
if len(split) != 2 { // split := strings.Split(serverAddr, ":")
log.Error("nacos server addr error!") // if len(split) != 2 {
} // log.Error("nacos server addr error!")
// }
port, _ := strconv.ParseUint(split[1], 10, 64) //
serverConfigs := []constant.ServerConfig{ // port, _ := strconv.ParseUint(split[1], 10, 64)
{ // serverConfigs := []constant.ServerConfig{
IpAddr: split[0], // {
Port: port, // IpAddr: split[0],
GrpcPort: port + 1000, // Port: port,
}, // GrpcPort: port + 1000,
} // },
// }
// Another way of create config client for dynamic configuration (recommend) //
configClient, err := clients.NewConfigClient( // // Another way of create config client for dynamic configuration (recommend)
vo.NacosClientParam{ // configClient, err := clients.NewConfigClient(
ClientConfig: &clientConfig, // vo.NacosClientParam{
ServerConfigs: serverConfigs, // ClientConfig: &clientConfig,
}, // ServerConfigs: serverConfigs,
) // },
if err != nil { // )
panic(err) // if err != nil {
} // panic(err)
// }
return configClient //
} // return configClient
//}
func getAllNacosConfig(v *viper.Viper, group string, configClient config_client.IConfigClient) []string { //
//func getAllNacosConfig(v *viper.Viper, group string, configClient config_client.IConfigClient) []string {
result := make([]string, 0) //
// result := make([]string, 0)
// main nacos configs //
mainNacosConfigFileName := v.GetString("spring.application.name") + "-" + v.GetString("spring.profiles.active") + "." + v.GetString("spring.cloud.nacos.config.file-extension") // // main nacos configs
// mainNacosConfigFileName := v.GetString("spring.application.name") + "-" + v.GetString("spring.profiles.active") + "." + v.GetString("spring.cloud.nacos.config.file-extension")
log.Debug(fmt.Sprintf("main nacos config file name is %s", mainNacosConfigFileName)) //
configContent := getConfig(mainNacosConfigFileName, group, configClient) // log.Debug(fmt.Sprintf("main nacos config file name is %s", mainNacosConfigFileName))
result = append(result, configContent) // configContent := getConfig(mainNacosConfigFileName, group, configClient)
// result = append(result, configContent)
// additional nacos config //
additionalNacosConfig := v.Get("spring.cloud.nacos.config.extension-configs") // // additional nacos config
// 增加断言判定map的类型 // additionalNacosConfig := v.Get("spring.cloud.nacos.config.extension-configs")
m, ok := additionalNacosConfig.([]interface{}) // // 增加断言判定map的类型
if !ok { // m, ok := additionalNacosConfig.([]interface{})
fmt.Println("additionalNacosConfig is not a slice") // if !ok {
return nil // fmt.Println("additionalNacosConfig is not a slice")
} // return nil
// }
for _, addConfigMap := range m { //
// for _, addConfigMap := range m {
realMap, _ := addConfigMap.(map[string]interface{}) //
// realMap, _ := addConfigMap.(map[string]interface{})
// 拿到配置的Key //
dataId := realMap["data-id"].(string) // // 拿到配置的Key
group := realMap["group"].(string) // dataId := realMap["data-id"].(string)
// group := realMap["group"].(string)
// 查询 //
config := getConfig(dataId, group, configClient) // // 查询
result = append(result, config) // config := getConfig(dataId, group, configClient)
} // result = append(result, config)
// }
return result //
} // return result
//}
// getConfig 从Nacos中获取相应的 //
func getConfig(dataId string, group string, configClient config_client.IConfigClient) string { //// getConfig 从Nacos中获取相应的
//func getConfig(dataId string, group string, configClient config_client.IConfigClient) string {
log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group)) //
// log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group))
content, err := configClient.GetConfig(vo.ConfigParam{ //
DataId: dataId, // content, err := configClient.GetConfig(vo.ConfigParam{
Group: group, // DataId: dataId,
}) // Group: group,
if err != nil { // })
log.Error("nacos config get error !", zap.Error(err)) // if err != nil {
} // log.Error("nacos config get error !", zap.Error(err))
// }
log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content)) //
// log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content))
return content //
} // return content
//}