package register import ( "agent-go/config" "agent-go/g" "agent-go/rabbitmq" "encoding/json" "fmt" "gopkg.in/yaml.v3" "io/ioutil" ) var omType = g.InitOmType var log = g.G.LOG func INIT() *config.AgentServerInfo { // 获取系统的环境变量 agentServerInfo := parseAgentServerInfo() nacosConfig := g.G.NacosConfig initToServerProp := &rabbitmq.ConnectProperty{ ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), QueueName: nacosConfig.GetString("octopus.message.init_to_server"), ExchangeType: g.QueueDirect, TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"), } initFromServerProp := &rabbitmq.ConnectProperty{ ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"), QueueName: nacosConfig.GetString("octopus.message.init_from_server"), ExchangeType: g.QueueDirect, TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"), } // 建立RabbitMQ的连接 // defer 关闭初始化连接 initToServer, err := rabbitmq.NewRabbitMQConn( initToServerProp, ) if err != nil { log.Error("init to server queue established error!") panic(err) } //defer rabbitmq.CloseChannel(initToServer) initFromServer, err := rabbitmq.NewRabbitMQConn( initFromServerProp, ) if err != nil { log.Error("init from server queue established error!") panic(err) } //defer rabbitmq.CloseChannel(initFromServer) // 组装OctopusMessage var octopusMsg *config.OctopusMessage octopusMsg = octopusMsg.BuildOctopusMsg( omType, agentServerInfo, ) msgBytes, err := json.Marshal(octopusMsg) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) } // 发送OM至MQ中O rabbitmq.Send( initToServer, initToServerProp, msgBytes, ) // 监听初始化连接中的信息 // 建立运行时RabbitMQ连接 handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) return agentServerInfo } // handleInitMsgFromServer 处理从Server接收的注册信息 func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false) // 同步很多抢占注册的情况 for delivery := range deliveries { var om *config.OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", string(delivery.Body))) } // 处理OM信息 if om.UUID == g.G.AgentServerInfo.AgentTopicName { // 是本机的注册回复信息 // 建立运行时RabbitMQ连接 rabbitmq.HandleOMsg(om) // 手动确认信息 delivery.Ack(false) // 手动关闭 注册队列的连接 shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp) return } // 不是自身的 注册回复信息 -- 拒绝 log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v", om)) delivery.Nack(false, true) } } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { } func parseAgentServerInfo() *config.AgentServerInfo { // 约定文件地址为 /etc/environment.d/octopus-agent.conf // 目前使用 var agentServerInfo *config.AgentServerInfo yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml") if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) } err = yaml.Unmarshal(yamlFile, &agentServerInfo) if err != nil { panic(fmt.Errorf("failed to unmarshal YAML: %v", err)) } jsonFormat, err := json.Marshal(agentServerInfo) if err != nil { log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) panic(err) } log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) return agentServerInfo }