Files
ProjectOctopus/agent-go/a_init/AgentInitialization.go
2024-04-08 15:19:15 +08:00

379 lines
11 KiB
Go

package a_init
import (
"encoding/json"
"fmt"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
"io/ioutil"
"math/rand"
"reflect"
"strings"
"time"
"wdd.io/agent-common/logger"
"wdd.io/agent-go/a_agent"
"wdd.io/agent-go/a_executor"
"wdd.io/agent-go/a_status"
"wdd.io/agent-go/g"
"wdd.io/agent-go/rabbitmq"
)
var initOmType = g.InitOmType
var P = g.G.P
var log = logger.Log
func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bool {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo(agentServerInfoConf)
// todo totally get from a_status module
// 初始化Nacos的连接配置
agentConfig := parseOctopusAgentConf(octopusAgentConfigFileName)
a_agent.AgentConfig = agentConfig
// re-get agentInfo from status module
agentInfo := a_status.ReportAgentInfo()
refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo)
// build operator cache
buildAgentOsOperator(agentInfo, agentServerInfo)
// 缓存此内容
a_agent.AgentServerInfoCache = agentServerInfo
// build for octopus tcp connect info struct
rabbitTCPConnectInfo := buildOctopusTCPConnect(agentConfig)
initToServerQueue, initFromServerQueue := buildOctopusInitQueue(rabbitTCPConnectInfo)
defer initToServerQueue.Close()
defer initFromServerQueue.Close()
// 建立连接
initToServerQueue.Connect()
// 建立连接
initFromServerQueue.Connect()
buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue)
// rabbit queue handle message from server
initFromServerQueue.Handle()
// receive from server
for g.G.AgentHasRegister == false {
select {
case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan:
if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) {
log.InfoF("[INIT] - agent has registered !")
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
break
}
default:
log.Debug("agent init not received from server ! start to waiting !")
time.Sleep(time.Millisecond * 500)
}
}
// 建立 运行时 RabbitMQ连接
runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName)
// 激活子模块
activatedOctopusAgentModules()
return runtimeConnectorQueue
}
func buildOctopusInitQueue(rabbitTCPConnectInfo *rabbitmq.RabbitTCPConnectInfo) (*rabbitmq.RabbitQueue, *rabbitmq.RabbitQueue) {
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
return initToServerQueue, initFromServerQueue
}
func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
// 建立 业务消息 接收队列
// agentTopicName为名称的队列
agentConfig := a_agent.AgentConfig
octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &rabbitmq.ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusMsgQueue := &rabbitmq.RabbitQueue{
RabbitProp: octopusConnectProp,
}
octopusMsgQueue.Connect()
// 建立 业务消息 返回队列
// 统一为 OctopusToServer
octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
octopusToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: octopusToServerQueueName,
ExchangeType: g.QueueTopic,
TopicKey: octopusToServerQueueName,
}
octopusToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: octopusToServerProp,
}
// 开启运行时消息返回队列
octopusToServerQueue.Connect()
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue)
// 开始处理Runtime的OM消息
businessForeverChan := octopusMsgQueue.Handle()
// cache it
rabbitmq.BusinessRuntimeQueue = octopusMsgQueue
rabbitmq.OctopusToServerQueue = octopusToServerQueue
return businessForeverChan
}
// activatedOctopusAgentModules 激活Octopus Agent的所有子模块
func activatedOctopusAgentModules() {
// Agent
_ = P.Submit(func() {
a_agent.Activate()
})
// Executor
_ = P.Submit(func() {
a_executor.Activate()
})
// Status
_ = P.Submit(func() {
a_status.Activate()
})
}
func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) {
// 组装OctopusMessage
var octopusMsg *rabbitmq.OctopusMessage
octopusMsg = octopusMsg.Build(
initOmType,
agentServerInfo,
)
// 发送OM至MQ中
_ = P.Submit(
func() {
agentRegisterCount := 0
for g.G.AgentHasRegister == false {
if agentRegisterCount > g.AgentRegisterFailedCount {
log.ErrorF("Agent Register Failed after %d times tryings !", g.AgentRegisterFailedCount)
return
}
log.InfoF("[INIT] - Send init message to server! ==> %s", octopusMsg)
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
initToServerQueue.SendOMsg(octopusMsg)
agentRegisterCount += 1
// 休眠
time.Sleep(5 * time.Minute)
}
})
}
func buildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo {
host := agentConfig.GetString("spring.rabbitmq.host")
port := agentConfig.GetString("spring.rabbitmq.port")
username := agentConfig.GetString("spring.rabbitmq.username")
password := agentConfig.GetString("spring.rabbitmq.password")
virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
//todo
return &rabbitmq.RabbitTCPConnectInfo{
UserName: username,
Password: password,
Host: host,
Port: port,
VirtualHost: virtualHost,
}
}
func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
agentServerInfo.Platform = agentInfo.HostInfo.Platform
agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily
agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion
agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion
agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch
log.DebugF("[refreshAgentInfoByStatusInfo] - ok !")
}
// handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool {
log.DebugF("message received from server is %s", initFromServerMsg)
var serverInfo a_agent.AgentServerInfo
s, er := initFromServerMsg.Content.(string)
if !er {
log.ErrorF("convert to string error! => %v", er)
}
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) {
// 是本机的注册回复信息
log.InfoF("OctopusMessage INIT from server is this agent !")
// 修改系统参数
g.G.AgentHasRegister = true
// 保存真实的AgentTopicName
a_agent.AgentServerInfoCache.TopicName = serverInfo.TopicName
} else {
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
log.WarnF("OctopusMessage INIT from server not this agent ! => %v ", &initFromServerMsg)
// 需要休眠等待不再获取相应的信息
_ = P.Submit(func() {
// 生成一个1到5之间的随机整数
seconds := rand.Intn(5) + 1
initToServerQueue.SendOMsg(initFromServerMsg)
// 谦让型 多并发模型 等待其他Agent注册成功
time.Sleep(time.Duration(seconds) * time.Second)
})
}
return g.G.AgentHasRegister
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
_ = initFromServerQueue.Close()
_ = initToServerQueue.Close()
log.InfoF("Octopus Agent Init Queue has disconnected!")
}
func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo {
// 约定文件地址为 /octopus-agent/octopus-agent.conf
var agentServerInfo *a_agent.AgentServerInfo
yamlFile, err := ioutil.ReadFile(agentServerInfoConf)
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
// uniform agent server info
UniformAgentServerInfo(agentServerInfo)
jsonFormat, err := json.Marshal(&agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}
// UniformAgentServerInfo uniform deal with ip
func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) {
// reflect to iterator all field
log.Info("[Initialization] - UniformAgentServerInfo !")
value := reflect.ValueOf(agentServerInfo).Elem()
for i := 0; i < value.NumField(); i++ {
field := value.Field(i)
if field.Kind() == reflect.String && field.CanSet() {
field.SetString(strings.TrimSpace(field.String()))
}
}
log.Debug("[Initialization] - uniform ip address !")
if strings.Contains(agentServerInfo.ServerIPInV4, "/") {
agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0]
}
if strings.Contains(agentServerInfo.ServerIPInV6, "/") {
agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0]
}
}
func buildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
// 2023年8月4日 pass through some key information
ossOfflinePrefix := "http://bastion.io"
if a_agent.AgentConfig != nil {
ossOfflinePrefix = a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix")
if !strings.HasSuffix(ossOfflinePrefix, "/") {
ossOfflinePrefix += "/"
}
} else {
log.WarnF("buildAgentOsOperator - agent oss offline prefix is null !")
}
// call the init exec function
agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix)
// assign the agentServerInfo
agentOsOperator.AgentServerInfo = agentServerInfo
// debug
marshal, _ := json.Marshal(agentOsOperator)
log.DebugF("[Agent INIT] cached agent operator is %s", marshal)
}