package main import ( "encoding/json" "fmt" "gopkg.in/yaml.v3" "io/ioutil" "reflect" "strings" "time" "wdd.io/agent-go/executor" "wdd.io/agent-go/g" "wdd.io/agent-go/rabbitmq" "wdd.io/agent-go/register" "wdd.io/agent-go/status" ) var omType = g.InitOmType var P = g.G.P //var AgentServerInfoCache = ®ister.AgentServerInfo{} func INIT(agentServerInfoConf string) chan bool { // 获取系统的环境变量 agentServerInfo := parseAgentServerInfo(agentServerInfoConf) // re-get agentInfo from status module agentInfo := status.ReportAgentInfo() refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) // build operator cache BuildAgentOsOperator(agentInfo, agentServerInfo) // 缓存此内容 //AgentServerInfoCache = agentServerInfo agentConfig := g.G.AgentConfig initToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), QueueName: agentConfig.GetString("octopus.message.init_to_server"), ExchangeType: g.QueueDirect, TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"), } initFromServerProp := &rabbitmq.ConnectProperty{ ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), QueueName: agentConfig.GetString("octopus.message.init_from_server"), ExchangeType: g.QueueDirect, TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"), } // 建立RabbitMQ的连接 initToServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initToServerProp, } //defer initToServerQueue.Close() // 建立连接 initToServerQueue.Connect() // 组装OctopusMessage var octopusMsg *rabbitmq.OctopusMessage octopusMsg = octopusMsg.Build( omType, agentServerInfo, ) msgBytes, err := json.Marshal(octopusMsg) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) } // 发送OM至MQ中 P.Submit( func() { for g.G.AgentHasRegister == false { log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes))) //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 initToServerQueue.Send( msgBytes, ) // 休眠 time.Sleep(10 * time.Minute) } }) // 监听初始化连接中的信息 initFromServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initFromServerProp, } //defer initFromServerQueue.Close() // 建立连接 initFromServerQueue.Connect() // 建立运行时RabbitMQ连接 businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) return businessForeverChan } func refreshAgentInfoByStatusInfo(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) { agentServerInfo.Platform = agentInfo.HostInfo.Platform agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch log.DebugF("[refreshAgentInfoByStatusInfo] - ok !") } // handleInitMsgFromServer 处理从Server接收的 注册信息 func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool { initOctopusMessageDeliveries := initFromServerQueue.Read(false) // 2023年6月19日 修复注册信息一直没有完全消费的问题 findRealAgentTopicName := "" // 同步很多抢占注册的情况 for delivery := range initOctopusMessageDeliveries { log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) var initOctopusMsg *rabbitmq.OctopusMessage err := json.Unmarshal(delivery.Body, &initOctopusMsg) if err != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", string(delivery.Body))) } var serverInfo register.AgentServerInfo s, er := initOctopusMsg.Content.(string) if !er { log.ErrorF("convet to string error! => %v", er) } cc := json.Unmarshal([]byte(s), &serverInfo) if cc != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) } serverName := serverInfo.ServerName // 处理OM信息 if initOctopusMsg != nil && strings.HasPrefix(initOctopusMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) { // 是本机的注册回复信息 log.InfoF("OctopusMessage INIT from server is this agent !") // 手动确认信息 delivery.Ack(false) // 修改系统参数 g.G.AgentHasRegister = true // 保存真实的AgentTopicName findRealAgentTopicName = serverInfo.TopicName // 手动关闭 注册队列的连接 shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) } else { // 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常 log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", initOctopusMsg)) delivery.Ack(false) // 需要休眠等待不再获取相应的信息 time.Sleep(5 * time.Second) } } // 建立 运行时 RabbitMQ连接 runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName) return runtimeConnectorQueue } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { initFromServerQueue.Close() initToServerQueue.Close() log.InfoF("Pretend to Shutdown register queue connection !") } func parseAgentServerInfo(agentServerInfoConf string) *register.AgentServerInfo { // 约定文件地址为 /octopus-agent/octopus-agent.conf var agentServerInfo *register.AgentServerInfo yamlFile, err := ioutil.ReadFile(agentServerInfoConf) if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) } err = yaml.Unmarshal(yamlFile, &agentServerInfo) if err != nil { panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) } // uniform agent server info UniformAgentServerInfo(agentServerInfo) jsonFormat, err := json.Marshal(&agentServerInfo) if err != nil { log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) panic(err) } log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) return agentServerInfo } // UniformAgentServerInfo uniform deal with ip func UniformAgentServerInfo(agentServerInfo *register.AgentServerInfo) { // reflect to iterator all field log.Info("[Initialization] - UniformAgentServerInfo !") value := reflect.ValueOf(agentServerInfo).Elem() for i := 0; i < value.NumField(); i++ { field := value.Field(i) if field.Kind() == reflect.String && field.CanSet() { field.SetString(strings.TrimSpace(field.String())) } } log.Debug("[Initialization] - uniform ip address !") if strings.Contains(agentServerInfo.ServerIPInV4, "/") { agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0] } if strings.Contains(agentServerInfo.ServerIPInV6, "/") { agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0] } } func BuildAgentOsOperator(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) { // 2023年8月4日 pass through some key information ossOfflinePrefix := "http://bastion.io" if g.G.AgentConfig != nil { ossOfflinePrefix := g.G.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix") if !strings.HasSuffix(ossOfflinePrefix, "/") { ossOfflinePrefix += "/" } } // call the init exec function agentOsOperator := executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix) // assign the agentServerInfo agentOsOperator.AgentServerInfo = agentServerInfo // debug marshal, _ := json.Marshal(agentOsOperator) log.DebugF("[Agent INIT] cached agent operator is %s", marshal) }