package a_init import ( "encoding/json" "fmt" "github.com/spf13/viper" "gopkg.in/yaml.v3" "math/rand" "os" "reflect" "regexp" "strconv" "strings" "time" "wdd.io/agent-common/logger" "wdd.io/agent-common/utils" "wdd.io/agent-go/a_agent" "wdd.io/agent-go/a_executor" "wdd.io/agent-go/a_status" "wdd.io/agent-go/g" "wdd.io/agent-go/rabbitmq" ) const AgentServerInfoLocalFilePath = "/usr/local/etc/octopus-agent/octopus-agent.conf" var initOmType = g.InitOmType var P = g.G.P var log = logger.Log func INIT(octopusAgentConfigFileName string) chan bool { // 初始化变量 agentServerInfo := &a_agent.AgentServerInfo{} // 初始化Agent的 RabbitMQ连接信息 agentConfig := parseOctopusAgentConf(octopusAgentConfigFileName) a_agent.AgentConfig = agentConfig // 使用a_status模块,自身获取到运行的环境信息 agentInfo := a_status.ReportAgentInfo() refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) if utils.FileExistAndNotNull(AgentServerInfoLocalFilePath) { // 获取系统的环境变量 agentServerInfoFromLocalFile := parseAgentServerInfo(AgentServerInfoLocalFilePath) // 合并系统环境变量,手动输入的部分会覆盖自身获得内容 utils.CopySameFields(agentServerInfoFromLocalFile, agentServerInfo) } // build operator cache buildAgentOsOperator(agentInfo, agentServerInfo) // 缓存此内容 a_agent.AgentServerInfoCache = agentServerInfo // build for octopus tcp connect info struct rabbitTCPConnectInfo := buildOctopusTCPConnect(agentConfig) initToServerQueue, initFromServerQueue := buildOctopusInitQueue(rabbitTCPConnectInfo) defer initToServerQueue.Close() defer initFromServerQueue.Close() // 建立连接 initToServerQueue.Connect() // 建立连接 initFromServerQueue.Connect() buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue) // rabbit queue handle message from server initFromServerQueue.Handle() // receive from server for g.G.AgentHasRegister == false { select { case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan: if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) { log.InfoF("[INIT] - agent has registered !") // 手动关闭 注册队列的连接 shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) break } default: log.Debug("agent init not received from server ! start to waiting !") time.Sleep(time.Millisecond * 500) } } // 建立 运行时 RabbitMQ连接 runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) // 激活子模块 activatedOctopusAgentModules() return runtimeConnectorQueue } func buildOctopusInitQueue(rabbitTCPConnectInfo *rabbitmq.RabbitTCPConnectInfo) (*rabbitmq.RabbitQueue, *rabbitmq.RabbitQueue) { initToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"), QueueName: a_agent.AgentConfig.GetString("octopus.message.init_to_server"), ExchangeType: g.QueueDirect, TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_to_server_key"), } initFromServerProp := &rabbitmq.ConnectProperty{ ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"), QueueName: a_agent.AgentConfig.GetString("octopus.message.init_from_server"), ExchangeType: g.QueueDirect, TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_from_server_key"), } // 建立RabbitMQ的连接 initToServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initToServerProp, RabbitConnectInfo: rabbitTCPConnectInfo, } initFromServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initFromServerProp, RabbitConnectInfo: rabbitTCPConnectInfo, } return initToServerQueue, initFromServerQueue } func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { // 建立 业务消息 接收队列 // agentTopicName为名称的队列 agentConfig := a_agent.AgentConfig octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange") octopusConnectProp := &rabbitmq.ConnectProperty{ ExchangeName: octopusExchangeName, QueueName: agentTopicName, ExchangeType: g.QueueTopic, TopicKey: agentTopicName + "*", } octopusMsgQueue := &rabbitmq.RabbitQueue{ RabbitProp: octopusConnectProp, } octopusMsgQueue.Connect() // 建立 业务消息 返回队列 // 统一为 OctopusToServer octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server") octopusToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: octopusExchangeName, QueueName: octopusToServerQueueName, ExchangeType: g.QueueTopic, TopicKey: octopusToServerQueueName, } octopusToServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: octopusToServerProp, } // 开启运行时消息返回队列 octopusToServerQueue.Connect() log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue) // 开始处理Runtime的OM消息 businessForeverChan := octopusMsgQueue.Handle() // cache it rabbitmq.BusinessRuntimeQueue = octopusMsgQueue rabbitmq.OctopusToServerQueue = octopusToServerQueue return businessForeverChan } // activatedOctopusAgentModules 激活Octopus Agent的所有子模块 func activatedOctopusAgentModules() { // Agent _ = P.Submit(func() { a_agent.Activate() }) // Executor _ = P.Submit(func() { a_executor.Activate() }) // Status _ = P.Submit(func() { a_status.Activate() }) } func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) { // 组装OctopusMessage var octopusMsg *rabbitmq.OctopusMessage octopusMsg = octopusMsg.Build( initOmType, agentServerInfo, ) // 发送OM至MQ中 _ = P.Submit( func() { agentRegisterCount := 0 for g.G.AgentHasRegister == false { if agentRegisterCount > g.AgentRegisterFailedCount { log.ErrorF("Agent Register Failed after %d times tryings !", g.AgentRegisterFailedCount) return } log.InfoF("[INIT] - Send init message to server! ==> %s", octopusMsg) //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 initToServerQueue.SendOMsg(octopusMsg) agentRegisterCount += 1 // 休眠 time.Sleep(5 * time.Minute) } }) } func buildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo { host := agentConfig.GetString("spring.rabbitmq.host") port := agentConfig.GetString("spring.rabbitmq.port") username := agentConfig.GetString("spring.rabbitmq.username") password := agentConfig.GetString("spring.rabbitmq.password") virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") //todo return &rabbitmq.RabbitTCPConnectInfo{ UserName: username, Password: password, Host: host, Port: port, VirtualHost: virtualHost, } } func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { // host info agentServerInfo.ServerName = agentInfo.HostInfo.Hostname agentServerInfo.MachineID = agentInfo.HostInfo.HostID // cpu part agentServerInfo.CPUCore = strconv.FormatInt(int64(agentInfo.CPUInfo.NumCores), 10) if len(agentInfo.CPUInfo.CPUInfo) > 0 { marshal, _ := json.Marshal(agentInfo.CPUInfo.CPUInfo[0]) agentServerInfo.CPUBrand = string(marshal) } // os info agentServerInfo.OSInfo = agentInfo.HostInfo.PlatformFamily + agentInfo.HostInfo.Platform + agentInfo.HostInfo.PlatformVersion agentServerInfo.OSKernelInfo = agentInfo.HostInfo.KernelVersion agentServerInfo.Virtualization = agentInfo.HostInfo.VirtualizationSystem + agentInfo.HostInfo.VirtualizationRole agentServerInfo.Platform = agentInfo.HostInfo.Platform agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch // memory part agentServerInfo.MemoryTotal = strconv.FormatInt(int64(agentInfo.MemoryInfo.TotalMemory), 10) agentServerInfo.SwapTotal = strconv.FormatInt(int64(agentInfo.MemoryInfo.SwapTotal), 10) // disk part info := agentInfo.DiskInfo diskTotal := uint64(0) diskUsage := uint64(0) for _, diskInfo := range info { if diskInfo.Total > diskTotal { diskTotal = diskInfo.Total diskUsage = diskInfo.Used } } agentServerInfo.DiskTotal = strconv.FormatUint(diskTotal, 10) agentServerInfo.DiskUsage = strconv.FormatUint(diskUsage, 10) // network part refreshAgentNetworkInfo(agentInfo, agentServerInfo) log.DebugF("[refreshAgentInfoByStatusInfo] - ok !") } func refreshAgentNetworkInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { // 测试网卡名称 //testCases := []string{"ens33", "eno1", "enp0s3", "enp1s2", "eth0", "enp2s5", "enx1234567890ab", "ens1234567890ab", "enp1234567890ab", "enp1234567890ab", "enp1", "lo","","docker0", "virbr0", "veth0",} //for _, tc := range testCases { // fmt.Printf("Network interface '%s' is %s\n", tc, fmt.Sprintf("%v", isNetworkInterface(tc))) //} // inner ip v4 v6 for _, networkInfo := range agentInfo.NetworkInfo { if isNetworkInterface(networkInfo.Name) { log.InfoF("refreshAgentNetworkInfo - network interface is %v", networkInfo) if networkInfo.InternalIPv4 == nil || len(networkInfo.InternalIPv4) == 0 { continue } // 通配到对应的网卡 s := networkInfo.InternalIPv4[0] if strings.Contains(s, "/") { s = strings.Split(s, "/")[0] } agentServerInfo.ServerIPInV4 = s if networkInfo.InternalIPv6 == nil || len(networkInfo.InternalIPv6) == 0 { continue } s2 := networkInfo.InternalIPv6[0] if strings.Contains(s2, "/") { s2 = strings.Split(s2, "/")[0] } agentServerInfo.ServerIPInV6 = s2 if agentServerInfo.ServerIPInV4 != "" { break } } } } // isNetworkInterface 检查网卡名称是否符合给定的模式(ens, eno或enp开头,或者像enp1s2这样的格式)。 func isNetworkInterface(networkInterface string) bool { // 正则表达式匹配 ens 或 eno 开头的字符串 // 或者以 enp 开头,后面跟着至少一个数字,一个连字符,再是一个数字,最后可以有一个可选的连字符和数字组合(如enp0s3或enp1s2) pattern := `^(ens|eno|eth|enp)[0-9]+$|enp+[0-9]s+[0-9]$` re := regexp.MustCompile(pattern) return re.MatchString(networkInterface) } // handleInitMsgFromServer 处理从Server接收的 注册信息 func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool { log.DebugF("message received from server is %s", initFromServerMsg) var serverInfo a_agent.AgentServerInfo s, er := initFromServerMsg.Content.(string) if !er { log.ErrorF("convert to string error! => %v", er) } cc := json.Unmarshal([]byte(s), &serverInfo) if cc != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) } serverName := serverInfo.ServerName // 处理OM信息 if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) { // 是本机的注册回复信息 log.InfoF("OctopusMessage INIT from server is this agent !") // 修改系统参数 g.G.AgentHasRegister = true // 保存真实的AgentTopicName a_agent.AgentServerInfoCache.TopicName = serverInfo.TopicName } else { // 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常 log.WarnF("OctopusMessage INIT from server not this agent ! => %v ", &initFromServerMsg) // 需要休眠等待不再获取相应的信息 _ = P.Submit(func() { // 生成一个1到5之间的随机整数 seconds := rand.Intn(5) + 1 initToServerQueue.SendOMsg(initFromServerMsg) // 谦让型 多并发模型 等待其他Agent注册成功 time.Sleep(time.Duration(seconds) * time.Second) }) } return g.G.AgentHasRegister } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { initFromServerQueue.ConsumeOK.Store(false) initToServerQueue.ConsumeOK.Store(false) log.InfoF("Octopus Agent Init Queue has disconnected!") } func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo { // 约定文件地址为 /octopus-agent/octopus-agent.conf var agentServerInfo *a_agent.AgentServerInfo yamlFile, err := os.ReadFile(agentServerInfoConf) if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) } err = yaml.Unmarshal(yamlFile, &agentServerInfo) if err != nil { panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) } // uniform agent server info UniformAgentServerInfo(agentServerInfo) jsonFormat, err := json.Marshal(&agentServerInfo) if err != nil { log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) panic(err) } log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) return agentServerInfo } // UniformAgentServerInfo uniform deal with ip func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) { // reflect to iterator all field log.Info("[Initialization] - UniformAgentServerInfo !") value := reflect.ValueOf(agentServerInfo).Elem() for i := 0; i < value.NumField(); i++ { field := value.Field(i) if field.Kind() == reflect.String && field.CanSet() { field.SetString(strings.TrimSpace(field.String())) } } log.Debug("[Initialization] - uniform ip address !") if strings.Contains(agentServerInfo.ServerIPInV4, "/") { agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0] } if strings.Contains(agentServerInfo.ServerIPInV6, "/") { agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0] } } func buildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { // 2023年8月4日 pass through some key information ossOfflinePrefix := "http://bastion.io" if a_agent.AgentConfig != nil { ossOfflinePrefix = a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix") if !strings.HasSuffix(ossOfflinePrefix, "/") { ossOfflinePrefix += "/" } } else { log.WarnF("buildAgentOsOperator - agent oss offline prefix is null !") } // call the init exec function agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix) // assign the agentServerInfo agentOsOperator.AgentServerInfo = agentServerInfo // debug marshal, _ := json.Marshal(agentOsOperator) log.DebugF("[Agent INIT] cached agent operator is %s", marshal) }