Files
ProjectOctopus/agent-go/AgentInitialization.go
2024-03-08 17:23:41 +08:00

265 lines
7.9 KiB
Go

package main
import (
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"reflect"
"strings"
"time"
"wdd.io/agent-go/executor"
"wdd.io/agent-go/g"
"wdd.io/agent-go/rabbitmq"
"wdd.io/agent-go/register"
"wdd.io/agent-go/status"
)
var omType = g.InitOmType
var P = g.G.P
//var AgentServerInfoCache = &register.AgentServerInfo{}
func INIT(agentServerInfoConf string) chan bool {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo(agentServerInfoConf)
// re-get agentInfo from status module
agentInfo := status.ReportAgentInfo()
refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo)
// build operator cache
BuildAgentOsOperator(agentInfo, agentServerInfo)
// 缓存此内容
//AgentServerInfoCache = agentServerInfo
agentConfig := g.G.AgentConfig
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
QueueName: agentConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
QueueName: agentConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
}
//defer initToServerQueue.Close()
// 建立连接
initToServerQueue.Connect()
// 组装OctopusMessage
var octopusMsg *rabbitmq.OctopusMessage
octopusMsg = octopusMsg.Build(
omType,
agentServerInfo,
)
msgBytes, err := json.Marshal(octopusMsg)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
}
// 发送OM至MQ中
P.Submit(
func() {
for g.G.AgentHasRegister == false {
log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes)))
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
initToServerQueue.Send(
msgBytes,
)
// 休眠
time.Sleep(10 * time.Minute)
}
})
// 监听初始化连接中的信息
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
}
//defer initFromServerQueue.Close()
// 建立连接
initFromServerQueue.Connect()
// 建立运行时RabbitMQ连接
businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo)
return businessForeverChan
}
func refreshAgentInfoByStatusInfo(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) {
agentServerInfo.Platform = agentInfo.HostInfo.Platform
agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily
agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion
agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion
agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch
log.DebugF("[refreshAgentInfoByStatusInfo] - ok !")
}
// handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool {
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
// 2023年6月19日 修复注册信息一直没有完全消费的问题
findRealAgentTopicName := ""
// 同步很多抢占注册的情况
for delivery := range initOctopusMessageDeliveries {
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
var initOctopusMsg *rabbitmq.OctopusMessage
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
if err != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
string(delivery.Body)))
}
var serverInfo register.AgentServerInfo
s, er := initOctopusMsg.Content.(string)
if !er {
log.ErrorF("convet to string error! => %v", er)
}
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if initOctopusMsg != nil && strings.HasPrefix(initOctopusMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) {
// 是本机的注册回复信息
log.InfoF("OctopusMessage INIT from server is this agent !")
// 手动确认信息
delivery.Ack(false)
// 修改系统参数
g.G.AgentHasRegister = true
// 保存真实的AgentTopicName
findRealAgentTopicName = serverInfo.TopicName
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
} else {
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", initOctopusMsg))
delivery.Ack(false)
// 需要休眠等待不再获取相应的信息
time.Sleep(5 * time.Second)
}
}
// 建立 运行时 RabbitMQ连接
runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName)
return runtimeConnectorQueue
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
initFromServerQueue.Close()
initToServerQueue.Close()
log.InfoF("Pretend to Shutdown register queue connection !")
}
func parseAgentServerInfo(agentServerInfoConf string) *register.AgentServerInfo {
// 约定文件地址为 /octopus-agent/octopus-agent.conf
var agentServerInfo *register.AgentServerInfo
yamlFile, err := ioutil.ReadFile(agentServerInfoConf)
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
// uniform agent server info
UniformAgentServerInfo(agentServerInfo)
jsonFormat, err := json.Marshal(&agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}
// UniformAgentServerInfo uniform deal with ip
func UniformAgentServerInfo(agentServerInfo *register.AgentServerInfo) {
// reflect to iterator all field
log.Info("[Initialization] - UniformAgentServerInfo !")
value := reflect.ValueOf(agentServerInfo).Elem()
for i := 0; i < value.NumField(); i++ {
field := value.Field(i)
if field.Kind() == reflect.String && field.CanSet() {
field.SetString(strings.TrimSpace(field.String()))
}
}
log.Debug("[Initialization] - uniform ip address !")
if strings.Contains(agentServerInfo.ServerIPInV4, "/") {
agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0]
}
if strings.Contains(agentServerInfo.ServerIPInV6, "/") {
agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0]
}
}
func BuildAgentOsOperator(agentInfo *status.AgentInfo, agentServerInfo *register.AgentServerInfo) {
// 2023年8月4日 pass through some key information
ossOfflinePrefix := "http://bastion.io"
if g.G.AgentConfig != nil {
ossOfflinePrefix := g.G.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix")
if !strings.HasSuffix(ossOfflinePrefix, "/") {
ossOfflinePrefix += "/"
}
}
// call the init exec function
agentOsOperator := executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix)
// assign the agentServerInfo
agentOsOperator.AgentServerInfo = agentServerInfo
// debug
marshal, _ := json.Marshal(agentOsOperator)
log.DebugF("[Agent INIT] cached agent operator is %s", marshal)
}