package a_init import ( "encoding/json" "fmt" "github.com/spf13/viper" "gopkg.in/yaml.v3" "io/ioutil" "reflect" "strings" "time" "wdd.io/agent-common/logger" "wdd.io/agent-go/a_agent" "wdd.io/agent-go/a_executor" "wdd.io/agent-go/a_status" "wdd.io/agent-go/g" "wdd.io/agent-go/rabbitmq" ) var initOmType = g.InitOmType var P = g.G.P var log = logger.Log func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bool { // 获取系统的环境变量 agentServerInfo := parseAgentServerInfo(agentServerInfoConf) // todo totally get from a_status module // re-get agentInfo from status module agentInfo := a_status.ReportAgentInfo() refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) // build operator cache BuildAgentOsOperator(agentInfo, agentServerInfo) // 缓存此内容 a_agent.AgentServerInfoCache = agentServerInfo // 初始化Nacos的连接配置 agentConfig := ParseConfiguration(octopusAgentConfigFileName) a_agent.AgentConfig = agentConfig // build for octopus tcp connect info struct rabbitTCPConnectInfo := BuildOctopusTCPConnect(agentConfig) initToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), QueueName: agentConfig.GetString("octopus.message.init_to_server"), ExchangeType: g.QueueDirect, TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"), } initFromServerProp := &rabbitmq.ConnectProperty{ ExchangeName: agentConfig.GetString("octopus.message.init_exchange"), QueueName: agentConfig.GetString("octopus.message.init_from_server"), ExchangeType: g.QueueDirect, TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"), } // 建立RabbitMQ的连接 initToServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initToServerProp, RabbitConnectInfo: rabbitTCPConnectInfo, } //defer initToServerQueue.Close() // 建立连接 initToServerQueue.Connect() // 监听初始化连接中的信息 initFromServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: initFromServerProp, RabbitConnectInfo: rabbitTCPConnectInfo, } //defer initFromServerQueue.Close() // 建立连接 initFromServerQueue.Connect() initFromServerQueue.Handle() buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue) // receive from server for g.G.AgentHasRegister == false { select { case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan: if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) { break } default: log.Debug("agent init not received from server ! start to waiting !") time.Sleep(time.Second * 10) } } //<-initForeverHandle //close(initFromServerQueue.ReceiveChan.InitRChan) // 建立 运行时 RabbitMQ连接 runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) // 激活子模块 activatedOctopusAgentModules() return runtimeConnectorQueue } func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { // 建立 业务消息 接收队列 // agentTopicName为名称的队列 agentConfig := a_agent.AgentConfig octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange") octopusConnectProp := &rabbitmq.ConnectProperty{ ExchangeName: octopusExchangeName, QueueName: agentTopicName, ExchangeType: g.QueueTopic, TopicKey: agentTopicName + "*", } octopusMsgQueue := &rabbitmq.RabbitQueue{ RabbitProp: octopusConnectProp, } octopusMsgQueue.Connect() // 建立 业务消息 返回队列 // 统一为 OctopusToServer octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server") octopusToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: octopusExchangeName, QueueName: octopusToServerQueueName, ExchangeType: g.QueueTopic, TopicKey: octopusToServerQueueName, } octopusToServerQueue := &rabbitmq.RabbitQueue{ RabbitProp: octopusToServerProp, } // 开启运行时消息返回队列 octopusToServerQueue.Connect() log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue) // 开始处理Runtime的OM消息 businessForeverChan := octopusMsgQueue.Handle() // cache it rabbitmq.BusinessRuntimeQueue = octopusMsgQueue return businessForeverChan } // activatedOctopusAgentModules 激活Octopus Agent的所有子模块 func activatedOctopusAgentModules() { // Agent err := P.Submit(func() { a_agent.Activate() }) if err != nil { return } // Executor _ = P.Submit(func() { a_executor.Activate() }) // Status _ = P.Submit(func() { a_status.Activate() }) } func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) { // 组装OctopusMessage var octopusMsg *rabbitmq.OctopusMessage octopusMsg = octopusMsg.Build( initOmType, agentServerInfo, ) msgBytes, err := json.Marshal(octopusMsg) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) } // 发送OM至MQ中 P.Submit( func() { for g.G.AgentHasRegister == false { log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes))) //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 initToServerQueue.Send( msgBytes, ) // 休眠 time.Sleep(10 * time.Minute) } }) } func BuildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo { host := agentConfig.GetString("spring.rabbitmq.host") port := agentConfig.GetString("spring.rabbitmq.port") username := agentConfig.GetString("spring.rabbitmq.username") password := agentConfig.GetString("spring.rabbitmq.password") virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") //todo return &rabbitmq.RabbitTCPConnectInfo{ UserName: username, Password: password, Host: host, Port: port, VirtualHost: virtualHost, } } func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { agentServerInfo.Platform = agentInfo.HostInfo.Platform agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch log.DebugF("[refreshAgentInfoByStatusInfo] - ok !") } // handleInitMsgFromServer 处理从Server接收的 注册信息 func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool { //initOctopusMessageDeliveries := initFromServerQueue.Read(false) log.DebugF("message received from server is %s", initFromServerMsg) var serverInfo a_agent.AgentServerInfo s, er := initFromServerMsg.Content.(string) if !er { log.ErrorF("convet to string error! => %v", er) } cc := json.Unmarshal([]byte(s), &serverInfo) if cc != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) } serverName := serverInfo.ServerName // 处理OM信息 if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) { // 是本机的注册回复信息 log.InfoF("OctopusMessage INIT from server is this agent !") // 手动确认信息 //delivery.Ack(false) // 修改系统参数 g.G.AgentHasRegister = true // 保存真实的AgentTopicName a_agent.AgentServerInfoCache.TopicName = serverInfo.TopicName // 手动关闭 注册队列的连接 //shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) } else { // 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常 log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", &initFromServerMsg)) // 需要休眠等待不再获取相应的信息 P.Submit(func() { time.Sleep(time.Second * 5) initToServerQueue.SendOMsg(initFromServerMsg) }) } return g.G.AgentHasRegister } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { initFromServerQueue.Close() initToServerQueue.Close() log.InfoF("Pretend to Shutdown register queue connection !") } func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo { // 约定文件地址为 /octopus-agent/octopus-agent.conf var agentServerInfo *a_agent.AgentServerInfo yamlFile, err := ioutil.ReadFile(agentServerInfoConf) if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) } err = yaml.Unmarshal(yamlFile, &agentServerInfo) if err != nil { panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) } // uniform agent server info UniformAgentServerInfo(agentServerInfo) jsonFormat, err := json.Marshal(&agentServerInfo) if err != nil { log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) panic(err) } log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) return agentServerInfo } // UniformAgentServerInfo uniform deal with ip func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) { // reflect to iterator all field log.Info("[Initialization] - UniformAgentServerInfo !") value := reflect.ValueOf(agentServerInfo).Elem() for i := 0; i < value.NumField(); i++ { field := value.Field(i) if field.Kind() == reflect.String && field.CanSet() { field.SetString(strings.TrimSpace(field.String())) } } log.Debug("[Initialization] - uniform ip address !") if strings.Contains(agentServerInfo.ServerIPInV4, "/") { agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0] } if strings.Contains(agentServerInfo.ServerIPInV6, "/") { agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0] } } func BuildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) { // 2023年8月4日 pass through some key information ossOfflinePrefix := "http://bastion.io" if a_agent.AgentConfig != nil { ossOfflinePrefix := a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix") if !strings.HasSuffix(ossOfflinePrefix, "/") { ossOfflinePrefix += "/" } } // call the init exec function agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix) // assign the agentServerInfo agentOsOperator.AgentServerInfo = agentServerInfo // debug marshal, _ := json.Marshal(agentOsOperator) log.DebugF("[Agent INIT] cached agent operator is %s", marshal) }