Files
ProjectOctopus/agent-go/register/AgentIntitilization.go
2023-03-28 13:59:16 +08:00

176 lines
4.9 KiB
Go

package register
import (
"agent-go/config"
"agent-go/g"
"agent-go/rabbitmq"
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
)
var omType = g.InitOmType
var log = g.G.LOG
func INIT() *config.AgentServerInfo {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo()
nacosConfig := g.G.NacosConfig
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
// defer 关闭初始化连接
initToServer, err := rabbitmq.NewRabbitMQConn(
initToServerProp,
)
if err != nil {
log.Error("init to server queue established error!")
panic(err)
}
//defer rabbitmq.CloseChannel(initToServer)
//defer rabbitmq.CloseChannel(initFromServer)
// 组装OctopusMessage
var octopusMsg *config.OctopusMessage
octopusMsg = octopusMsg.BuildOctopusMsg(
omType,
agentServerInfo,
)
msgBytes, err := json.Marshal(octopusMsg)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
}
log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes)))
// 发送OM至MQ中
rabbitmq.Send(
initToServer,
initToServerProp,
msgBytes,
)
// 监听初始化连接中的信息
initFromServer, err := rabbitmq.NewRabbitMQConn(
initFromServerProp,
)
if err != nil {
log.Error("init from server queue established error!")
panic(err)
}
// 建立运行时RabbitMQ连接
handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp, agentServerInfo)
return agentServerInfo
}
// handleInitMsgFromServer 处理从Server接收的注册信息
func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty, agentServerInfo *config.AgentServerInfo) {
deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false)
forever := make(chan bool)
go func() {
// 同步很多抢占注册的情况
for delivery := range deliveries {
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
var om *config.OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
string(delivery.Body)))
}
var serverInfo config.AgentServerInfo
s, _ := om.Content.(string)
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if om != nil && om.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
// 是本机的注册回复信息
// 建立运行时RabbitMQ连接
// change to async
go rabbitmq.HandleOMsg(om)
// 手动确认信息
delivery.Ack(false)
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp)
return
}
// 不是自身的 注册回复信息 -- 拒绝
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", om, delivery.Body))
delivery.Nack(false, true)
}
}()
// wait forever
<-forever
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) {
}
func parseAgentServerInfo() *config.AgentServerInfo {
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
// 目前使用
var agentServerInfo *config.AgentServerInfo
//yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
yamlFile, err := ioutil.ReadFile("server-env.yaml")
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
jsonFormat, err := json.Marshal(agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}