diff --git a/agent-go/AgentInitialization.go b/agent-go/AgentInitialization.go deleted file mode 100644 index e480828..0000000 --- a/agent-go/AgentInitialization.go +++ /dev/null @@ -1,264 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "gopkg.in/yaml.v3" - "io/ioutil" - "reflect" - "strings" - "time" - "wdd.io/agent-go/executor" - "wdd.io/agent-go/g" - "wdd.io/agent-go/rabbitmq" - "wdd.io/agent-go/register" - "wdd.io/agent-go/status" -) - -var omType = g.InitOmType -var P = g.G.P - -//var AgentServerInfoCache = ®ister.AgentServerInfo{} - -func INIT(agentServerInfoConf string) chan bool { - - // 获取系统的环境变量 - agentServerInfo := parseAgentServerInfo(agentServerInfoConf) - - // re-get agentInfo from status module - agentInfo := status.ReportAgentInfo() - refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) - - // build operator cache - BuildAgentOsOperator(agentInfo, agentServerInfo) - - // 缓存此内容 - //AgentServerInfoCache = agentServerInfo - - agentConfig := g.G.AgentConfig - - initToServerProp := &rabbitmq.ConnectProperty{ - ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), - QueueName: agentConfig.GetString("octopus.message.init_to_server"), - ExchangeType: g.QueueDirect, - TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"), - } - - initFromServerProp := &rabbitmq.ConnectProperty{ - ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), - QueueName: agentConfig.GetString("octopus.message.init_from_server"), - ExchangeType: g.QueueDirect, - TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"), - } - - // 建立RabbitMQ的连接 - initToServerQueue := &rabbitmq.RabbitQueue{ - RabbitProp: initToServerProp, - } - //defer initToServerQueue.Close() - - // 建立连接 - initToServerQueue.Connect() - - // 组装OctopusMessage - var octopusMsg *rabbitmq.OctopusMessage - octopusMsg = octopusMsg.Build( - omType, - agentServerInfo, - ) - msgBytes, err := json.Marshal(octopusMsg) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) - } - - // 发送OM至MQ中 - P.Submit( - func() { - for g.G.AgentHasRegister == false { - - log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes))) - - //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 - initToServerQueue.Send( - msgBytes, - ) - // 休眠 - time.Sleep(10 * time.Minute) - - } - }) - - // 监听初始化连接中的信息 - initFromServerQueue := &rabbitmq.RabbitQueue{ - RabbitProp: initFromServerProp, - } - //defer initFromServerQueue.Close() - - // 建立连接 - initFromServerQueue.Connect() - - // 建立运行时RabbitMQ连接 - businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) - - return businessForeverChan -} - -func refreshAgentInfoByStatusInfo(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) { - - agentServerInfo.Platform = agentInfo.HostInfo.Platform - agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily - agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion - agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion - agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch - - log.DebugF("[refreshAgentInfoByStatusInfo] - ok !") -} - -// handleInitMsgFromServer 处理从Server接收的 注册信息 -func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool { - - initOctopusMessageDeliveries := initFromServerQueue.Read(false) - - // 2023年6月19日 修复注册信息一直没有完全消费的问题 - findRealAgentTopicName := "" - - // 同步很多抢占注册的情况 - for delivery := range initOctopusMessageDeliveries { - - log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) - - var initOctopusMsg *rabbitmq.OctopusMessage - err := json.Unmarshal(delivery.Body, &initOctopusMsg) - if err != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", - string(delivery.Body))) - } - - var serverInfo register.AgentServerInfo - - s, er := initOctopusMsg.Content.(string) - if !er { - log.ErrorF("convet to string error! => %v", er) - } - cc := json.Unmarshal([]byte(s), &serverInfo) - if cc != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) - } - serverName := serverInfo.ServerName - - // 处理OM信息 - if initOctopusMsg != nil && strings.HasPrefix(initOctopusMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) { - // 是本机的注册回复信息 - log.InfoF("OctopusMessage INIT from server is this agent !") - - // 手动确认信息 - delivery.Ack(false) - - // 修改系统参数 - g.G.AgentHasRegister = true - - // 保存真实的AgentTopicName - findRealAgentTopicName = serverInfo.TopicName - - // 手动关闭 注册队列的连接 - shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) - - } else { - // 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常 - log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", initOctopusMsg)) - delivery.Ack(false) - - // 需要休眠等待不再获取相应的信息 - time.Sleep(5 * time.Second) - } - } - - // 建立 运行时 RabbitMQ连接 - runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName) - - return runtimeConnectorQueue -} - -// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 -func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { - - initFromServerQueue.Close() - initToServerQueue.Close() - - log.InfoF("Pretend to Shutdown register queue connection !") -} - -func parseAgentServerInfo(agentServerInfoConf string) *register.AgentServerInfo { - - // 约定文件地址为 /octopus-agent/octopus-agent.conf - var agentServerInfo *register.AgentServerInfo - yamlFile, err := ioutil.ReadFile(agentServerInfoConf) - - if err != nil { - panic(fmt.Errorf("failed to read YAML file: %v", err)) - } - - err = yaml.Unmarshal(yamlFile, &agentServerInfo) - if err != nil { - panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) - } - - // uniform agent server info - UniformAgentServerInfo(agentServerInfo) - - jsonFormat, err := json.Marshal(&agentServerInfo) - if err != nil { - log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) - panic(err) - } - log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) - - return agentServerInfo -} - -// UniformAgentServerInfo uniform deal with ip -func UniformAgentServerInfo(agentServerInfo *register.AgentServerInfo) { - - // reflect to iterator all field - log.Info("[Initialization] - UniformAgentServerInfo !") - value := reflect.ValueOf(agentServerInfo).Elem() - - for i := 0; i < value.NumField(); i++ { - field := value.Field(i) - if field.Kind() == reflect.String && field.CanSet() { - field.SetString(strings.TrimSpace(field.String())) - } - } - - log.Debug("[Initialization] - uniform ip address !") - if strings.Contains(agentServerInfo.ServerIPInV4, "/") { - agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0] - } - - if strings.Contains(agentServerInfo.ServerIPInV6, "/") { - agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0] - } - -} - -func BuildAgentOsOperator(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) { - - // 2023年8月4日 pass through some key information - ossOfflinePrefix := "http://bastion.io" - if g.G.AgentConfig != nil { - ossOfflinePrefix := g.G.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix") - if !strings.HasSuffix(ossOfflinePrefix, "/") { - ossOfflinePrefix += "/" - } - } - - // call the init exec function - agentOsOperator := executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix) - - // assign the agentServerInfo - agentOsOperator.AgentServerInfo = agentServerInfo - - // debug - marshal, _ := json.Marshal(agentOsOperator) - log.DebugF("[Agent INIT] cached agent operator is %s", marshal) -} diff --git a/agent-go/BastionInitializaion.go b/agent-go/BastionInitializaion.go index f519492..69e9fad 100644 --- a/agent-go/BastionInitializaion.go +++ b/agent-go/BastionInitializaion.go @@ -1,15 +1,16 @@ package main import ( - "wdd.io/agent-go/executor" - "wdd.io/agent-go/register" - "wdd.io/agent-go/status" + "wdd.io/agent-go/a_agent" + "wdd.io/agent-go/a_executor" + "wdd.io/agent-go/a_init" + "wdd.io/agent-go/a_status" ) func BastionModeInit() { // Build For Operator - agentServerInfo := ®ister.AgentServerInfo{ + agentServerInfo := &a_agent.AgentServerInfo{ ServerName: "BastionSingle", ServerIPPbV4: "127.0.0.1", ServerIPInV4: "127.0.0.1", @@ -40,12 +41,12 @@ func BastionModeInit() { } // re-get agentInfo from status module - agentInfo := status.ReportAgentInfo() - refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) - BuildAgentOsOperator(agentInfo, agentServerInfo) + agentInfo := a_status.ReportAgentInfo() + a_init.refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) + a_init.BuildAgentOsOperator(agentInfo, agentServerInfo) // install docker - agentOsOperator := executor.AgentOsOperatorCache + agentOsOperator := a_executor.AgentOsOperatorCache // boot up minio & rabbitmq agentOsOperator.InstallDockerFromLocalExec(nil) agentOsOperator.InstallDockerComposeFromLocalExec() diff --git a/agent-go/a_agent/AgentHandler.go b/agent-go/a_agent/AgentHandler.go new file mode 100644 index 0000000..2a76295 --- /dev/null +++ b/agent-go/a_agent/AgentHandler.go @@ -0,0 +1,9 @@ +package a_agent + +import "wdd.io/agent-common/logger" + +var log = logger.Log + +func Activate() { + log.Info("Module [ AGENT ] activated !") +} diff --git a/agent-go/register/AgentServerInfo.go b/agent-go/a_agent/AgentServerInfo.go similarity index 93% rename from agent-go/register/AgentServerInfo.go rename to agent-go/a_agent/AgentServerInfo.go index 89ff1eb..3f64541 100644 --- a/agent-go/register/AgentServerInfo.go +++ b/agent-go/a_agent/AgentServerInfo.go @@ -1,4 +1,10 @@ -package register +package a_agent + +import "github.com/spf13/viper" + +var AgentServerInfoCache = &AgentServerInfo{} + +var AgentConfig *viper.Viper type AgentServerInfo struct { ServerName string `json:"serverName" yaml:"serverName"` diff --git a/agent-go/executor/AppFunction.go b/agent-go/a_executor/AppFunction.go similarity index 99% rename from agent-go/executor/AppFunction.go rename to agent-go/a_executor/AppFunction.go index 93d1add..1277067 100644 --- a/agent-go/executor/AppFunction.go +++ b/agent-go/a_executor/AppFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "fmt" diff --git a/agent-go/executor/BaseFunction.go b/agent-go/a_executor/BaseFunction.go similarity index 99% rename from agent-go/executor/BaseFunction.go rename to agent-go/a_executor/BaseFunction.go index 4da1343..4e8b9c1 100644 --- a/agent-go/executor/BaseFunction.go +++ b/agent-go/a_executor/BaseFunction.go @@ -1,10 +1,10 @@ -package executor +package a_executor import ( "fmt" "net" "strings" - "wdd.io/agent-go/register" + "wdd.io/agent-go/a_agent" ) type BaseFunc interface { @@ -35,7 +35,7 @@ type AgentOsOperator struct { AgentOSReleaseCode string `json:"agent_os_release_code",comment:"主机操作系统的发行版代号, focal之类的"` - AgentServerInfo *register.AgentServerInfo `json:"agent_server_info"` + AgentServerInfo *a_agent.AgentServerInfo `json:"agent_server_info"` // 离线下载URL地址 OssOfflinePrefix string `comment:"必须要用 / 结尾"` diff --git a/agent-go/executor/BaseFunction_test.go b/agent-go/a_executor/BaseFunction_test.go similarity index 95% rename from agent-go/executor/BaseFunction_test.go rename to agent-go/a_executor/BaseFunction_test.go index 217b9de..c7d950d 100644 --- a/agent-go/executor/BaseFunction_test.go +++ b/agent-go/a_executor/BaseFunction_test.go @@ -1,9 +1,9 @@ -package executor +package a_executor import ( "testing" "wdd.io/agent-common/assert" - "wdd.io/agent-go/register" + "wdd.io/agent-go/a_agent" ) var agentOP = &AgentOsOperator{ @@ -18,7 +18,7 @@ var agentOP = &AgentOsOperator{ IsAgentInnerWall: true, AgentArch: "amd64", AgentOSReleaseCode: "focal", - AgentServerInfo: ®ister.AgentServerInfo{ + AgentServerInfo: &a_agent.AgentServerInfo{ ServerName: "", ServerIPPbV4: "", ServerIPInV4: "10.250.0.100", diff --git a/agent-go/executor/BasicFunction.go b/agent-go/a_executor/BasicFunction.go similarity index 99% rename from agent-go/executor/BasicFunction.go rename to agent-go/a_executor/BasicFunction.go index 3259f85..81d786a 100644 --- a/agent-go/executor/BasicFunction.go +++ b/agent-go/a_executor/BasicFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "bufio" diff --git a/agent-go/executor/BasicFunction_test.go b/agent-go/a_executor/BasicFunction_test.go similarity index 99% rename from agent-go/executor/BasicFunction_test.go rename to agent-go/a_executor/BasicFunction_test.go index 0002b8b..a8f6b80 100644 --- a/agent-go/executor/BasicFunction_test.go +++ b/agent-go/a_executor/BasicFunction_test.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "strconv" diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/a_executor/CommandExecutor.go similarity index 84% rename from agent-go/executor/CommandExecutor.go rename to agent-go/a_executor/CommandExecutor.go index 8587270..2041fc2 100644 --- a/agent-go/executor/CommandExecutor.go +++ b/agent-go/a_executor/CommandExecutor.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "bufio" @@ -25,6 +25,40 @@ var log = logger.Log var AgentOsOperatorCache = &AgentOsOperator{} +func Activate() { + log.Info("Module [ EXECUTOR ] activated !") + + //// 转换类型 + //executionMsgString := octopusMessage.Content.(string) + // + //// 解析 ExecutionMessage + //var executionMessage *a_executor.ExecutionMessage + //err := json.Unmarshal([]byte(executionMsgString), &executionMessage) + //if err != nil { + // log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) + // return + //} + // + //// 执行命令 + //ok, resultLog := a_executor.Execute(executionMessage) + //if ok { + // octopusMessage.ResultCode = "200" + //} else { + // octopusMessage.ResultCode = "300" + //} + // + //// 返回结果 + //if executionMessage.NeedResultReplay { + // // send back the result log + // octopusMessage.Result = resultLog + //} + //// 返回时间 + //octopusMessage.ACTime = utils.ParseDateTimeTime() + // + //// 返回结果 + //octopusMessage.SendToOctopusServer() +} + func Execute(em *ExecutionMessage) (bool, []string) { var resultLog []string diff --git a/agent-go/executor/FunctionalExecutor.go b/agent-go/a_executor/FunctionalExecutor.go similarity index 99% rename from agent-go/executor/FunctionalExecutor.go rename to agent-go/a_executor/FunctionalExecutor.go index 8d28bd6..a00df94 100644 --- a/agent-go/executor/FunctionalExecutor.go +++ b/agent-go/a_executor/FunctionalExecutor.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "bufio" diff --git a/agent-go/executor/FunctionalExecutor_test.go b/agent-go/a_executor/FunctionalExecutor_test.go similarity index 99% rename from agent-go/executor/FunctionalExecutor_test.go rename to agent-go/a_executor/FunctionalExecutor_test.go index d869743..0856210 100644 --- a/agent-go/executor/FunctionalExecutor_test.go +++ b/agent-go/a_executor/FunctionalExecutor_test.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "bytes" diff --git a/agent-go/executor/HarborExecutor.go b/agent-go/a_executor/HarborExecutor.go similarity index 99% rename from agent-go/executor/HarborExecutor.go rename to agent-go/a_executor/HarborExecutor.go index b81c616..8bc4634 100644 --- a/agent-go/executor/HarborExecutor.go +++ b/agent-go/a_executor/HarborExecutor.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "context" diff --git a/agent-go/executor/ImageFunction.go b/agent-go/a_executor/ImageFunction.go similarity index 99% rename from agent-go/executor/ImageFunction.go rename to agent-go/a_executor/ImageFunction.go index 0619163..c92a6ef 100644 --- a/agent-go/executor/ImageFunction.go +++ b/agent-go/a_executor/ImageFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "strings" diff --git a/agent-go/executor/ImageFunction_test.go b/agent-go/a_executor/ImageFunction_test.go similarity index 97% rename from agent-go/executor/ImageFunction_test.go rename to agent-go/a_executor/ImageFunction_test.go index e2c8c54..2bffaef 100644 --- a/agent-go/executor/ImageFunction_test.go +++ b/agent-go/a_executor/ImageFunction_test.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "fmt" diff --git a/agent-go/executor/InitFunction.go b/agent-go/a_executor/InitFunction.go similarity index 94% rename from agent-go/executor/InitFunction.go rename to agent-go/a_executor/InitFunction.go index 21da7e4..2a61998 100644 --- a/agent-go/executor/InitFunction.go +++ b/agent-go/a_executor/InitFunction.go @@ -1,13 +1,13 @@ -package executor +package a_executor import ( "encoding/json" "strconv" "strings" - "wdd.io/agent-go/status" + "wdd.io/agent-go/a_status" ) -func BuildAgentOsOperator(agentInfo *status.AgentInfo, ossOfflinePrefix string) *AgentOsOperator { +func BuildAgentOsOperator(agentInfo *a_status.AgentInfo, ossOfflinePrefix string) *AgentOsOperator { AgentOsOperatorCache = &AgentOsOperator{ InstallCommandPrefix: []string{ @@ -34,7 +34,7 @@ func BuildAgentOsOperator(agentInfo *status.AgentInfo, ossOfflinePrefix string) return AgentOsOperatorCache } -func detectByAgentStatusInfo(agentInfo *status.AgentInfo, os *AgentOsOperator) { +func detectByAgentStatusInfo(agentInfo *a_status.AgentInfo, os *AgentOsOperator) { if agentInfo == nil { log.WarnF("[detectByAgentStatusInfo] - agentInfo from status module is nil, roll back to traditional way!") // detectByOsType() diff --git a/agent-go/executor/K8sFunction.go b/agent-go/a_executor/K8sFunction.go similarity index 99% rename from agent-go/executor/K8sFunction.go rename to agent-go/a_executor/K8sFunction.go index fad63f7..14dc749 100644 --- a/agent-go/executor/K8sFunction.go +++ b/agent-go/a_executor/K8sFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor import ( "fmt" diff --git a/agent-go/executor/MySqlFunction.go b/agent-go/a_executor/MySqlFunction.go similarity index 99% rename from agent-go/executor/MySqlFunction.go rename to agent-go/a_executor/MySqlFunction.go index 3c9a6b7..0627537 100644 --- a/agent-go/executor/MySqlFunction.go +++ b/agent-go/a_executor/MySqlFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor //import ( // _ "github.com/go-sql-driver/mysql" diff --git a/agent-go/executor/MySqlFunction_test.go b/agent-go/a_executor/MySqlFunction_test.go similarity index 95% rename from agent-go/executor/MySqlFunction_test.go rename to agent-go/a_executor/MySqlFunction_test.go index 3bc3639..ccc7d84 100644 --- a/agent-go/executor/MySqlFunction_test.go +++ b/agent-go/a_executor/MySqlFunction_test.go @@ -1,4 +1,4 @@ -package executor +package a_executor //func TestMySqlConnection(t *testing.T) { // diff --git a/agent-go/executor/WindowsFunction.go b/agent-go/a_executor/WindowsFunction.go similarity index 98% rename from agent-go/executor/WindowsFunction.go rename to agent-go/a_executor/WindowsFunction.go index 6959651..5f4e3b0 100644 --- a/agent-go/executor/WindowsFunction.go +++ b/agent-go/a_executor/WindowsFunction.go @@ -1,4 +1,4 @@ -package executor +package a_executor //import ( // "fmt" diff --git a/agent-go/executor/WindowsFunction_test.go b/agent-go/a_executor/WindowsFunction_test.go similarity index 82% rename from agent-go/executor/WindowsFunction_test.go rename to agent-go/a_executor/WindowsFunction_test.go index 307deae..d773953 100644 --- a/agent-go/executor/WindowsFunction_test.go +++ b/agent-go/a_executor/WindowsFunction_test.go @@ -1,4 +1,4 @@ -package executor +package a_executor //import "testing" // diff --git a/agent-go/executor/script/123 b/agent-go/a_executor/script/123 similarity index 100% rename from agent-go/executor/script/123 rename to agent-go/a_executor/script/123 diff --git a/agent-go/executor/script/1_node_important.sh b/agent-go/a_executor/script/1_node_important.sh similarity index 100% rename from agent-go/executor/script/1_node_important.sh rename to agent-go/a_executor/script/1_node_important.sh diff --git a/agent-go/executor/script/install_golang_on_host.sh b/agent-go/a_executor/script/install_golang_on_host.sh similarity index 100% rename from agent-go/executor/script/install_golang_on_host.sh rename to agent-go/a_executor/script/install_golang_on_host.sh diff --git a/agent-go/executor/script/shutdownFirewall.txt b/agent-go/a_executor/script/shutdownFirewall.txt similarity index 100% rename from agent-go/executor/script/shutdownFirewall.txt rename to agent-go/a_executor/script/shutdownFirewall.txt diff --git a/agent-go/a_init/AgentInitialization.go b/agent-go/a_init/AgentInitialization.go new file mode 100644 index 0000000..b9d767f --- /dev/null +++ b/agent-go/a_init/AgentInitialization.go @@ -0,0 +1,357 @@ +package a_init + +import ( + "encoding/json" + "fmt" + "github.com/spf13/viper" + "gopkg.in/yaml.v3" + "io/ioutil" + "reflect" + "strings" + "time" + "wdd.io/agent-common/logger" + "wdd.io/agent-go/a_agent" + "wdd.io/agent-go/a_executor" + "wdd.io/agent-go/a_status" + "wdd.io/agent-go/g" + "wdd.io/agent-go/rabbitmq" +) + +var initOmType = g.InitOmType +var P = g.G.P + +var log = logger.Log + +//var AgentServerInfoCache = ®ister.AgentServerInfo{} + +func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bool { + + // 获取系统的环境变量 + agentServerInfo := parseAgentServerInfo(agentServerInfoConf) + // todo totally get from a_status module + + // re-get agentInfo from status module + agentInfo := a_status.ReportAgentInfo() + refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) + + // build operator cache + BuildAgentOsOperator(agentInfo, agentServerInfo) + + // 缓存此内容 + a_agent.AgentServerInfoCache = agentServerInfo + + // 初始化Nacos的连接配置 + agentConfig := ParseConfiguration(octopusAgentConfigFileName) + a_agent.AgentConfig = agentConfig + + // build for octopus tcp connect info struct + rabbitTCPConnectInfo := BuildOctopusTCPConnect(agentConfig) + + initToServerProp := &rabbitmq.ConnectProperty{ + ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), + QueueName: agentConfig.GetString("octopus.message.init_to_server"), + ExchangeType: g.QueueDirect, + TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"), + } + + initFromServerProp := &rabbitmq.ConnectProperty{ + ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), + QueueName: agentConfig.GetString("octopus.message.init_from_server"), + ExchangeType: g.QueueDirect, + TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"), + } + + // 建立RabbitMQ的连接 + initToServerQueue := &rabbitmq.RabbitQueue{ + RabbitProp: initToServerProp, + RabbitConnectInfo: rabbitTCPConnectInfo, + } + //defer initToServerQueue.Close() + + // 建立连接 + initToServerQueue.Connect() + + // 监听初始化连接中的信息 + initFromServerQueue := &rabbitmq.RabbitQueue{ + RabbitProp: initFromServerProp, + } + //defer initFromServerQueue.Close() + + // 建立连接 + initFromServerQueue.Connect() + + initForeverHandle := initFromServerQueue.Handle() + + buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue) + + // receive from server + for g.G.AgentHasRegister == false { + select { + case <-initFromServerQueue.ReceiveChan.InitRChan: + initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan + handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) + default: + //log.Debug("") + time.Sleep(time.Second * 10) + } + } + + <-initForeverHandle + close(initFromServerQueue.ReceiveChan.InitRChan) + + // 建立 运行时 RabbitMQ连接 + runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) + + // 激活子模块 + activatedOctopusAgentModules() + + return runtimeConnectorQueue +} + +func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { + + // 建立 业务消息 接收队列 + // agentTopicName为名称的队列 + agentConfig := a_agent.AgentConfig + + octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange") + + octopusConnectProp := &rabbitmq.ConnectProperty{ + ExchangeName: octopusExchangeName, + QueueName: agentTopicName, + ExchangeType: g.QueueTopic, + TopicKey: agentTopicName + "*", + } + + octopusMsgQueue := &rabbitmq.RabbitQueue{ + RabbitProp: octopusConnectProp, + } + octopusMsgQueue.Connect() + + // 建立 业务消息 返回队列 + // 统一为 OctopusToServer + octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server") + + octopusToServerProp := &rabbitmq.ConnectProperty{ + ExchangeName: octopusExchangeName, + QueueName: octopusToServerQueueName, + ExchangeType: g.QueueTopic, + TopicKey: octopusToServerQueueName, + } + + octopusToServerQueue := &rabbitmq.RabbitQueue{ + RabbitProp: octopusToServerProp, + } + + // 开启运行时消息返回队列 + octopusToServerQueue.Connect() + + log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue) + + // 开始处理Runtime的OM消息 + businessForeverChan := octopusMsgQueue.Handle() + + return businessForeverChan +} + +// activatedOctopusAgentModules 激活Octopus Agent的所有子模块 +func activatedOctopusAgentModules() { + + // Agent + a_agent.Activate() + // Executor + a_executor.Activate() + // Status + a_status.Activate() +} + +func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) { + // 组装OctopusMessage + var octopusMsg *rabbitmq.OctopusMessage + octopusMsg = octopusMsg.Build( + initOmType, + agentServerInfo, + ) + msgBytes, err := json.Marshal(octopusMsg) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) + } + + // 发送OM至MQ中 + P.Submit( + func() { + for g.G.AgentHasRegister == false { + + log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes))) + + //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 + initToServerQueue.Send( + msgBytes, + ) + // 休眠 + time.Sleep(10 * time.Minute) + } + }) +} + +func BuildOctopusTCPConnect(agentConfig *viper.Viper) rabbitmq.RabbitTCPConnectInfo { + host := agentConfig.GetString("spring.rabbitmq.host") + port := agentConfig.GetString("spring.rabbitmq.port") + username := agentConfig.GetString("spring.rabbitmq.username") + password := agentConfig.GetString("spring.rabbitmq.password") + virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") + + //todo + return rabbitmq.RabbitTCPConnectInfo{ + UserName: username, + Password: password, + Host: host, + Port: port, + VirtualHost: virtualHost, + } +} + +func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { + + agentServerInfo.Platform = agentInfo.HostInfo.Platform + agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily + agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion + agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion + agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch + + log.DebugF("[refreshAgentInfoByStatusInfo] - ok !") +} + +// handleInitMsgFromServer 处理从Server接收的 注册信息 +func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool { + + //initOctopusMessageDeliveries := initFromServerQueue.Read(false) + + log.Debug(fmt.Sprintf("message received from server is %s", &initFromServerMsg)) + + var serverInfo a_agent.AgentServerInfo + + s, er := initFromServerMsg.Content.(string) + if !er { + log.ErrorF("convet to string error! => %v", er) + } + cc := json.Unmarshal([]byte(s), &serverInfo) + if cc != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) + } + serverName := serverInfo.ServerName + + // 处理OM信息 + if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) { + // 是本机的注册回复信息 + log.InfoF("OctopusMessage INIT from server is this agent !") + + // 手动确认信息 + //delivery.Ack(false) + + // 修改系统参数 + g.G.AgentHasRegister = true + + // 保存真实的AgentTopicName + a_agent.AgentServerInfoCache.TopicName = serverInfo.TopicName + + // 手动关闭 注册队列的连接 + //shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) + + } else { + // 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常 + log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", &initFromServerMsg)) + + // 需要休眠等待不再获取相应的信息 + P.Submit(func() { + time.Sleep(time.Second * 5) + initToServerQueue.SendOMsg(initFromServerMsg) + }) + } + + return g.G.AgentHasRegister +} + +// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 +func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { + + initFromServerQueue.Close() + initToServerQueue.Close() + + log.InfoF("Pretend to Shutdown register queue connection !") +} + +func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo { + + // 约定文件地址为 /octopus-agent/octopus-agent.conf + var agentServerInfo *a_agent.AgentServerInfo + yamlFile, err := ioutil.ReadFile(agentServerInfoConf) + + if err != nil { + panic(fmt.Errorf("failed to read YAML file: %v", err)) + } + + err = yaml.Unmarshal(yamlFile, &agentServerInfo) + if err != nil { + panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) + } + + // uniform agent server info + UniformAgentServerInfo(agentServerInfo) + + jsonFormat, err := json.Marshal(&agentServerInfo) + if err != nil { + log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) + panic(err) + } + log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) + + return agentServerInfo +} + +// UniformAgentServerInfo uniform deal with ip +func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) { + + // reflect to iterator all field + log.Info("[Initialization] - UniformAgentServerInfo !") + value := reflect.ValueOf(agentServerInfo).Elem() + + for i := 0; i < value.NumField(); i++ { + field := value.Field(i) + if field.Kind() == reflect.String && field.CanSet() { + field.SetString(strings.TrimSpace(field.String())) + } + } + + log.Debug("[Initialization] - uniform ip address !") + if strings.Contains(agentServerInfo.ServerIPInV4, "/") { + agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0] + } + + if strings.Contains(agentServerInfo.ServerIPInV6, "/") { + agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0] + } + +} + +func BuildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { + + // 2023年8月4日 pass through some key information + ossOfflinePrefix := "http://bastion.io" + if a_agent.AgentConfig != nil { + ossOfflinePrefix := a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix") + if !strings.HasSuffix(ossOfflinePrefix, "/") { + ossOfflinePrefix += "/" + } + } + + // call the init exec function + agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix) + + // assign the agentServerInfo + agentOsOperator.AgentServerInfo = agentServerInfo + + // debug + marshal, _ := json.Marshal(agentOsOperator) + log.DebugF("[Agent INIT] cached agent operator is %s", marshal) +} diff --git a/agent-go/register/ConfigParser.go b/agent-go/a_init/ConfigParser.go similarity index 97% rename from agent-go/register/ConfigParser.go rename to agent-go/a_init/ConfigParser.go index a876040..3cce561 100644 --- a/agent-go/register/ConfigParser.go +++ b/agent-go/a_init/ConfigParser.go @@ -1,4 +1,4 @@ -package register +package a_init import ( "fmt" diff --git a/agent-go/register/NacosInitalizationDeprecated.go b/agent-go/a_init/NacosInitalizationDeprecated.go similarity index 99% rename from agent-go/register/NacosInitalizationDeprecated.go rename to agent-go/a_init/NacosInitalizationDeprecated.go index 2aa11b3..202d31d 100644 --- a/agent-go/register/NacosInitalizationDeprecated.go +++ b/agent-go/a_init/NacosInitalizationDeprecated.go @@ -1,4 +1,4 @@ -package register +package a_init // //import ( diff --git a/agent-go/status/CPU.go b/agent-go/a_status/CPU.go similarity index 98% rename from agent-go/status/CPU.go rename to agent-go/a_status/CPU.go index d4f2b1f..b5705c8 100644 --- a/agent-go/status/CPU.go +++ b/agent-go/a_status/CPU.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "github.com/shirou/gopsutil/v3/cpu" diff --git a/agent-go/status/CPU_test.go b/agent-go/a_status/CPU_test.go similarity index 96% rename from agent-go/status/CPU_test.go rename to agent-go/a_status/CPU_test.go index 138f6b9..b313812 100644 --- a/agent-go/status/CPU_test.go +++ b/agent-go/a_status/CPU_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "encoding/json" diff --git a/agent-go/status/Disk.go b/agent-go/a_status/Disk.go similarity index 99% rename from agent-go/status/Disk.go rename to agent-go/a_status/Disk.go index eaad69b..6d56701 100644 --- a/agent-go/status/Disk.go +++ b/agent-go/a_status/Disk.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "fmt" diff --git a/agent-go/status/Disk_test.go b/agent-go/a_status/Disk_test.go similarity index 96% rename from agent-go/status/Disk_test.go rename to agent-go/a_status/Disk_test.go index a39fe1c..90f461d 100644 --- a/agent-go/status/Disk_test.go +++ b/agent-go/a_status/Disk_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "encoding/json" diff --git a/agent-go/status/Docker_test.go b/agent-go/a_status/Docker_test.go similarity index 92% rename from agent-go/status/Docker_test.go rename to agent-go/a_status/Docker_test.go index 27087cd..b7af216 100644 --- a/agent-go/status/Docker_test.go +++ b/agent-go/a_status/Docker_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "testing" diff --git a/agent-go/status/Dokcer.go b/agent-go/a_status/Dokcer.go similarity index 98% rename from agent-go/status/Dokcer.go rename to agent-go/a_status/Dokcer.go index c67f665..1fcc3e4 100644 --- a/agent-go/status/Dokcer.go +++ b/agent-go/a_status/Dokcer.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "github.com/shirou/gopsutil/v3/docker" diff --git a/agent-go/status/Host.go b/agent-go/a_status/Host.go similarity index 94% rename from agent-go/status/Host.go rename to agent-go/a_status/Host.go index 0e3898e..8881ab2 100644 --- a/agent-go/status/Host.go +++ b/agent-go/a_status/Host.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "github.com/shirou/gopsutil/v3/host" diff --git a/agent-go/status/Host_test.go b/agent-go/a_status/Host_test.go similarity index 92% rename from agent-go/status/Host_test.go rename to agent-go/a_status/Host_test.go index 146c99e..040a37a 100644 --- a/agent-go/status/Host_test.go +++ b/agent-go/a_status/Host_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "testing" diff --git a/agent-go/status/Memory.go b/agent-go/a_status/Memory.go similarity index 99% rename from agent-go/status/Memory.go rename to agent-go/a_status/Memory.go index dfe3153..a0fca54 100644 --- a/agent-go/status/Memory.go +++ b/agent-go/a_status/Memory.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "fmt" diff --git a/agent-go/status/Memory_test.go b/agent-go/a_status/Memory_test.go similarity index 98% rename from agent-go/status/Memory_test.go rename to agent-go/a_status/Memory_test.go index b5c5575..885f174 100644 --- a/agent-go/status/Memory_test.go +++ b/agent-go/a_status/Memory_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "encoding/json" diff --git a/agent-go/status/Network.go b/agent-go/a_status/Network.go similarity index 99% rename from agent-go/status/Network.go rename to agent-go/a_status/Network.go index e05777f..2ca05ca 100644 --- a/agent-go/status/Network.go +++ b/agent-go/a_status/Network.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "encoding/json" diff --git a/agent-go/status/Network_test.go b/agent-go/a_status/Network_test.go similarity index 98% rename from agent-go/status/Network_test.go rename to agent-go/a_status/Network_test.go index 02b8c3c..15ba992 100644 --- a/agent-go/status/Network_test.go +++ b/agent-go/a_status/Network_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "encoding/json" diff --git a/agent-go/status/Status.go b/agent-go/a_status/Status.go similarity index 81% rename from agent-go/status/Status.go rename to agent-go/a_status/Status.go index 48d3e95..ccb2afc 100644 --- a/agent-go/status/Status.go +++ b/agent-go/a_status/Status.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "fmt" @@ -39,6 +39,50 @@ type AgentInfo struct { DockerInfo *DockerMetric } +func Activate() { + log.Info("Module [ STATUS ] activated !") + //v, ok := (octopusMessage.Content).(string) + //if !ok { + // log.ErrorF("convert to string is wrong %s", v) + //} + // + //statusMsgString := octopusMessage.Content.(string) + // + //var statusMessage *a_status.StatusMessage + //err := json.Unmarshal([]byte(statusMsgString), &statusMessage) + //if err != nil { + // fmt.Println(err.Error()) + // log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) + // return + //} + // + //// OMessageStatusTypeEnum + //var statusRes string + //if strings.HasPrefix(statusMessage.StatusType, "PING") { + // // ping info + // statusRes = a_status.Ping() + //} else if strings.HasPrefix(statusMessage.StatusType, "METRIC") { + // // metric info + // agentStatusString, _ := json.Marshal(a_status.ReportAgentMetric()) + // statusRes = string(agentStatusString) + //} else if strings.HasPrefix(statusMessage.StatusType, "INFO") { + // log.InfoF("[statusOMHandler] - call for agent info !") + //} else { + // log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType) + //} + // + //// 返回消息 + //// 组装消息 + //octopusMessage.ACTime = utils.ParseDateTimeTime() + //octopusMessage.Result = statusRes + //// 发送回去 + //statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) + //OctopusToServerQueue.Send(statusOctopusReplayMessage) + // + //// 输出日志 + //log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) +} + func Ping() string { return "PONG" } diff --git a/agent-go/status/Status_test.go b/agent-go/a_status/Status_test.go similarity index 97% rename from agent-go/status/Status_test.go rename to agent-go/a_status/Status_test.go index 98fcf07..58ae042 100644 --- a/agent-go/status/Status_test.go +++ b/agent-go/a_status/Status_test.go @@ -1,4 +1,4 @@ -package status +package a_status import ( "fmt" diff --git a/agent-go/status/tmp/AgentInfo.json b/agent-go/a_status/tmp/AgentInfo.json similarity index 100% rename from agent-go/status/tmp/AgentInfo.json rename to agent-go/a_status/tmp/AgentInfo.json diff --git a/agent-go/status/tmp/AgentStatus.json b/agent-go/a_status/tmp/AgentStatus.json similarity index 100% rename from agent-go/status/tmp/AgentStatus.json rename to agent-go/a_status/tmp/AgentStatus.json diff --git a/agent-go/g/global.go b/agent-go/g/global.go index df1c4f7..69294fd 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -2,13 +2,11 @@ package g import ( "github.com/panjf2000/ants/v2" - "github.com/spf13/viper" "wdd.io/agent-common/logger" ) type Global struct { AgentHasRegister bool - AgentConfig *viper.Viper P *ants.Pool } @@ -41,7 +39,6 @@ var G = NewGlobal( func NewGlobal(pool *ants.Pool) *Global { return &Global{ AgentHasRegister: false, - AgentConfig: nil, P: pool, } } diff --git a/agent-go/main.go b/agent-go/main.go index 9e719ad..498fd7b 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -4,8 +4,7 @@ import ( "flag" "fmt" "wdd.io/agent-common/logger" - "wdd.io/agent-go/g" - "wdd.io/agent-go/register" + "wdd.io/agent-go/a_init" ) var log = logger.Log @@ -14,26 +13,25 @@ func main() { // 解析命令行参数 var version string - var agentServerInfoConf string + var agentServerInfoConfFile string var mode string flag.StringVar(&version, "version", "", "config file version") flag.StringVar(&mode, "mode", "agent", "agent run mode") - flag.StringVar(&agentServerInfoConf, "agentServerInfoConf", "", "agent server info conf file") + flag.StringVar(&agentServerInfoConfFile, "agentServerInfoConfFile", "", "agent server info conf file") flag.Parse() + if mode == "bastion" { BastionModeInit() return } - // 读取对应版本的配置文件 - filename := fmt.Sprintf("octopus-agent-%s.yaml", version) - println("config file name is => " + filename) - println("agent server info file is => " + agentServerInfoConf) - // 初始化Nacos的连接配置 - g.G.AgentConfig = register.ParseConfiguration(filename) + // 读取对应版本的配置文件 + octopusAgentConfigFileName := fmt.Sprintf("octopus-agent-%s.yaml", version) + fmt.Println("config file name is => " + octopusAgentConfigFileName) + fmt.Println("agent server info file is => " + agentServerInfoConfFile) // 执行初始化之策工作 - businessForeverChan := INIT(agentServerInfoConf) + businessForeverChan := a_init.INIT(octopusAgentConfigFileName, agentServerInfoConfFile) // 永远等待 runtime的队列消息 <-businessForeverChan diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go deleted file mode 100644 index 258879c..0000000 --- a/agent-go/rabbitmq/OMsgConnector.go +++ /dev/null @@ -1,77 +0,0 @@ -package rabbitmq - -import ( - "encoding/json" - "fmt" - "wdd.io/agent-go/g" -) - -var OctopusToServerQueue = &RabbitQueue{} - -var P = g.G.P - -func BuildOMsgRuntimeConnectorQueue(agentTopicName string) chan bool { - - // 建立 业务消息 接收队列 - // agentTopicName为名称的队列 - agentConfig := g.G.AgentConfig - - octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange") - - octopusConnectProp := &ConnectProperty{ - ExchangeName: octopusExchangeName, - QueueName: agentTopicName, - ExchangeType: g.QueueTopic, - TopicKey: agentTopicName + "*", - } - - octopusMsgQueue := &RabbitQueue{ - RabbitProp: octopusConnectProp, - } - octopusMsgQueue.Connect() - - // 建立 业务消息 返回队列 - // 统一为 OctopusToServer - octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server") - - octopusToServerProp := &ConnectProperty{ - ExchangeName: octopusExchangeName, - QueueName: octopusToServerQueueName, - ExchangeType: g.QueueTopic, - TopicKey: octopusToServerQueueName, - } - - OctopusToServerQueue = &RabbitQueue{ - RabbitProp: octopusToServerProp, - } - - // 开启运行时消息返回队列 - OctopusToServerQueue.Connect() - - log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue) - - deliveries := octopusMsgQueue.Read(true) - businessForeverChan := make(chan bool) - P.Submit( - func() { - // 死循环,处理Octopus Message - for delivery := range deliveries { - - var om *OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) - // 保存到某处 - continue - } - - // 策略模式 处理消息 - P.Submit(func() { - om.Handle() - }) - } - }) - - return businessForeverChan - -} diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go index bcfd3d6..8c33ef1 100644 --- a/agent-go/rabbitmq/OctopusMessage.go +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -3,11 +3,7 @@ package rabbitmq import ( "encoding/json" "fmt" - "strings" "wdd.io/agent-common/utils" - "wdd.io/agent-go/executor" - "wdd.io/agent-go/g" - "wdd.io/agent-go/status" ) type IOctopusMessage interface { @@ -16,10 +12,6 @@ type IOctopusMessage interface { OctopusMsgBuilder } -type OctopusMsgHandler interface { - Handle(octopusMessage *OctopusMessage) -} - type OctopusMsgSender interface { Send(rabbitQueue *RabbitQueue, msg []byte) @@ -40,12 +32,6 @@ type OctopusMessage struct { ResultCode string `json:"resultCode"` } -func (om *OctopusMessage) Handle() { - // 实际执行 OM handle进程 - log.Debug("接收到OctopusMessage,开始处理!") - doHandleOctopusMessage(om) -} - func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { rabbitQueue.Send(msg) } @@ -84,116 +70,3 @@ func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMess ACTime: curTimeString, } } - -func doHandleOctopusMessage(octopusMessage *OctopusMessage) { - - switch octopusMessage.OctopusMessageType { - case g.InitOmType: - go func() {}() - case g.ExecOmType: - P.Submit(func() { - executorOMHandler(octopusMessage) - }) - case g.StatusOmType: - P.Submit(func() { - statusOMHandler(octopusMessage) - }) - case g.AgentOmType: - P.Submit(func() { - agentOMHandler(octopusMessage) - }, - ) - default: - P.Submit(func() { - blackHoleOMHandler(octopusMessage) - }) - } - -} - -// agentOMHandler 处理Agent的核心操作指令 -func agentOMHandler(octopusMessage *OctopusMessage) { - -} - -func executorOMHandler(octopusMessage *OctopusMessage) { - log.Debug("开始处理 Executor Octopus Message !") - - // 转换类型 - executionMsgString := octopusMessage.Content.(string) - - // 解析 ExecutionMessage - var executionMessage *executor.ExecutionMessage - err := json.Unmarshal([]byte(executionMsgString), &executionMessage) - if err != nil { - log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) - return - } - - // 执行命令 - ok, resultLog := executor.Execute(executionMessage) - if ok { - octopusMessage.ResultCode = "200" - } else { - octopusMessage.ResultCode = "300" - } - - // 返回结果 - if executionMessage.NeedResultReplay { - // send back the result log - octopusMessage.Result = resultLog - } - // 返回时间 - octopusMessage.ACTime = utils.ParseDateTimeTime() - - // 返回结果 - octopusMessage.SendToOctopusServer() -} - -func statusOMHandler(octopusMessage *OctopusMessage) { - - v, ok := (octopusMessage.Content).(string) - if !ok { - log.ErrorF("convert to string is wrong %s", v) - } - - statusMsgString := octopusMessage.Content.(string) - - var statusMessage *status.StatusMessage - err := json.Unmarshal([]byte(statusMsgString), &statusMessage) - if err != nil { - fmt.Println(err.Error()) - log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) - return - } - - // OMessageStatusTypeEnum - var statusRes string - if strings.HasPrefix(statusMessage.StatusType, "PING") { - // ping info - statusRes = status.Ping() - } else if strings.HasPrefix(statusMessage.StatusType, "METRIC") { - // metric info - agentStatusString, _ := json.Marshal(status.ReportAgentMetric()) - statusRes = string(agentStatusString) - } else if strings.HasPrefix(statusMessage.StatusType, "INFO") { - log.InfoF("[statusOMHandler] - call for agent info !") - } else { - log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType) - } - - // 返回消息 - // 组装消息 - octopusMessage.ACTime = utils.ParseDateTimeTime() - octopusMessage.Result = statusRes - // 发送回去 - statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) - OctopusToServerQueue.Send(statusOctopusReplayMessage) - - // 输出日志 - log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) -} - -func blackHoleOMHandler(octopusMessage *OctopusMessage) { - log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage)) -} diff --git a/agent-go/rabbitmq/OctopusMessageHandler.go b/agent-go/rabbitmq/OctopusMessageHandler.go new file mode 100644 index 0000000..816612f --- /dev/null +++ b/agent-go/rabbitmq/OctopusMessageHandler.go @@ -0,0 +1,43 @@ +package rabbitmq + +import ( + "fmt" + "wdd.io/agent-go/g" +) + +var OctopusToServerQueue = &RabbitQueue{} + +var P = g.G.P + +type OctopusMsgHandler interface { + HandleMsg(*RabbitReceiveChan) +} + +func (om *OctopusMessage) HandleMsg(rChan *RabbitReceiveChan) { + // 实际执行 OM handle进程 + log.Debug("接收到OctopusMessage, 开始处理!") + doHandleOctopusMessage(om, rChan) +} + +func doHandleOctopusMessage(octopusMessage *OctopusMessage, rChan *RabbitReceiveChan) { + + switch octopusMessage.OctopusMessageType { + case g.InitOmType: + rChan.InitRChan <- octopusMessage + case g.ExecOmType: + rChan.ExecutorRChan <- octopusMessage + case g.StatusOmType: + rChan.StatusRChan <- octopusMessage + case g.AgentOmType: + rChan.AgentRChan <- octopusMessage + default: + P.Submit(func() { + blackHoleOMHandler(octopusMessage) + }) + } + +} + +func blackHoleOMHandler(octopusMessage *OctopusMessage) { + log.Error(fmt.Sprintf("[BLACK HOLE] octopusMessage type wrong! msg is => %v", octopusMessage)) +} diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index ea185d1..82e8c72 100644 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -1,17 +1,18 @@ package rabbitmq import ( + "encoding/json" "fmt" "github.com/streadway/amqp" "strings" "sync" "wdd.io/agent-common/logger" - "wdd.io/agent-go/g" ) type RabbitMQ interface { RabbitSendWriter RabbitConnectCloser + RabbitQueueHandler } type RabbitSendWriter interface { @@ -26,10 +27,19 @@ type RabbitConnectCloser interface { Close() error } -type RabbitQueue struct { - RabbitConn *RabbitMQConn +type RabbitQueueHandler interface { + Handle() chan bool +} +type RabbitQueue struct { + // 连接实体 + RabbitConn *RabbitMQConn + // 连接属性 RabbitProp *ConnectProperty + // 底层连接tcp信息 + RabbitConnectInfo RabbitTCPConnectInfo + // 返回消息队列 + ReceiveChan *RabbitReceiveChan } // RabbitMQConn is a struct that holds the connection and channel objects @@ -38,6 +48,13 @@ type RabbitMQConn struct { Channel *amqp.Channel } +type RabbitReceiveChan struct { + AgentRChan chan *OctopusMessage + ExecutorRChan chan *OctopusMessage + StatusRChan chan *OctopusMessage + InitRChan chan *OctopusMessage +} + type ConnectProperty struct { ExchangeName string QueueName string @@ -45,6 +62,14 @@ type ConnectProperty struct { TopicKey string } +type RabbitTCPConnectInfo struct { + UserName string + Password string + Host string + Port string + VirtualHost string +} + var log = logger.Log // 定义全局唯一的 Singleton 实例 @@ -54,25 +79,28 @@ var instance *amqp.Connection var once sync.Once // 初始化 Singleton 实例的函数 -func createInstance() { +func createInstance(rabbitConnectInfo RabbitTCPConnectInfo) func() { // 在这里进行 Singleton 的初始化操作 // 获取RabbitMQ的连接地址 - rabbitMQEndpointFromG := parseRabbitMQEndpointFromG() + rabbitMQEndpoint := parseRabbitMQEndpoint(rabbitConnectInfo) // 创建全局唯一连接 RabbitMQ连接 - connection, err := amqp.Dial(rabbitMQEndpointFromG) + connection, err := amqp.Dial(rabbitMQEndpoint) if err != nil { log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err)) } instance = connection + return nil } // GetInstance 获取全局唯一的 Singleton 实例的函数 -func GetInstance() *amqp.Connection { +func GetInstance(rabbitConnectInfo RabbitTCPConnectInfo) *amqp.Connection { // 使用 sync.Once 确保 createInstance 只会被调用一次 - once.Do(createInstance) + + // todo 理解 + once.Do(createInstance(rabbitConnectInfo)) return instance } @@ -80,7 +108,7 @@ func GetInstance() *amqp.Connection { func (r *RabbitQueue) Connect() { // 获取RabbitMQ的连接 - conn := GetInstance() + conn := GetInstance(r.RabbitConnectInfo) ch, err := conn.Channel() if err != nil { @@ -121,10 +149,26 @@ func (r *RabbitQueue) Connect() { log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err)) } + // build for receive chan + rabbitRCha := &RabbitReceiveChan{} + if strings.HasPrefix(r.RabbitProp.QueueName, "Init") { + // init queue + rabbitRCha.InitRChan = make(chan *OctopusMessage) + } else { + // business queue + rabbitRCha.AgentRChan = make(chan *OctopusMessage, 5) + rabbitRCha.ExecutorRChan = make(chan *OctopusMessage, 5) + rabbitRCha.StatusRChan = make(chan *OctopusMessage, 5) + } + + // connection r.RabbitConn = &RabbitMQConn{ Connection: conn, Channel: ch, } + + // receive chan + r.ReceiveChan = rabbitRCha } func (r *RabbitQueue) Close() error { @@ -138,6 +182,29 @@ func (r *RabbitQueue) Close() error { return err } +func (r *RabbitQueue) Handle() chan bool { + + deliveries := r.Read(true) + forverHandle := make(chan bool) + // 死循环,处理Octopus Message + for delivery := range deliveries { + + var om *OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) + // 保存到某处 + continue + } + // 策略模式 处理消息 + P.Submit(func() { + om.HandleMsg(r.ReceiveChan) + }) + } + + return forverHandle +} + // Send 向RabbitMQ中发送消息 func (r *RabbitQueue) Send(message []byte) { // 往哪里发 @@ -159,6 +226,15 @@ func (r *RabbitQueue) Send(message []byte) { } } +func (r *RabbitQueue) SendOMsg(oMessage *OctopusMessage) { + bytes, err := json.Marshal(&oMessage) + if err != nil { + log.ErrorF("octopus message Marshal error %v", &oMessage) + } + + r.Send(bytes) +} + func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { // 拿到特定的Channel @@ -181,30 +257,21 @@ func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { return msgs } -// parseRabbitMQEndpoint 根据全局变量agentConfig解析出RabbitMQ的连接地址 -func parseRabbitMQEndpointFromG() string { - - agentConfig := g.G.AgentConfig +func parseRabbitMQEndpoint(rabbitConnectInfo RabbitTCPConnectInfo) string { var res strings.Builder - host := agentConfig.GetString("spring.rabbitmq.host") - port := agentConfig.GetString("spring.rabbitmq.port") - username := agentConfig.GetString("spring.rabbitmq.username") - password := agentConfig.GetString("spring.rabbitmq.password") - virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") - // amqp://{username}:{password}@{hostname}:{port}/{virtual_host} res.WriteString("amqp://") - res.WriteString(username) + res.WriteString(rabbitConnectInfo.UserName) res.WriteString(":") - res.WriteString(password) + res.WriteString(rabbitConnectInfo.Password) res.WriteString("@") - res.WriteString(host) + res.WriteString(rabbitConnectInfo.Host) res.WriteString(":") - res.WriteString(port) + res.WriteString(rabbitConnectInfo.Port) res.WriteString("/") - res.WriteString(virtualHost) + res.WriteString(rabbitConnectInfo.VirtualHost) s := res.String() diff --git a/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java b/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java index 398f2df..2e87b28 100644 --- a/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java +++ b/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java @@ -42,7 +42,7 @@ public class TestImageSyncScheduler { )); ArrayList ImageFullNameList = new ArrayList<>(List.of( - "harbor.cdcyy.com.cn/cmii/cmii-uav-mission:5.3.0-cqly-032802" + "harbor.cdcyy.com.cn/cmii/cmii-uav-industrial-portfolio:5.4.0-cqly-032802" )); Boolean downloadAndCompressOnly = true; @@ -52,7 +52,6 @@ public class TestImageSyncScheduler { String innerWorkerAgentName = "Chengdu-amd64-65-lapwdd"; //wdd // String innerWorkerAgentName = "Chengdu-amd64-71-3571gd"; //prod - // 之下不要修改 除非你知道自己在干什么! // start diff --git a/server/src/test/java/io/wdd/server/func/TestSingleCommand.java b/server/src/test/java/io/wdd/server/func/TestSingleCommand.java new file mode 100644 index 0000000..6022308 --- /dev/null +++ b/server/src/test/java/io/wdd/server/func/TestSingleCommand.java @@ -0,0 +1,39 @@ +package io.wdd.server.func; + +import io.wdd.func.auto.beans.BaseFunctionEnum; +import io.wdd.func.auto.service.FuncService; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.ArrayList; +import java.util.List; + +@SpringBootTest +public class TestSingleCommand { + + @Autowired + FuncService funcService; + + @Test + public void toSingleServer() { + + String targetAgentTopicName = "Chengdu-amd64-71-3571gd"; + BaseFunctionEnum baseFunctionEnum = BaseFunctionEnum.INSTALL_DOCKER; + + + ArrayList funcArgs = new ArrayList<>(); + funcArgs.add(baseFunctionEnum.getFuncName()); + funcArgs.add("10.250.0.100"); + funcArgs.add("123"); + funcArgs.add(""); + funcArgs.add(""); + funcArgs.add(""); + List stringList = funcService.callBaseFuncService(targetAgentTopicName, baseFunctionEnum.getFuncName(), funcArgs); + + + System.out.println("stringList = " + stringList); + + + } +}