246 lines
7.1 KiB
Go
246 lines
7.1 KiB
Go
package main
|
|
|
|
import (
|
|
"agent-go/executor"
|
|
"agent-go/g"
|
|
"agent-go/rabbitmq"
|
|
"agent-go/register"
|
|
"encoding/json"
|
|
"fmt"
|
|
"gopkg.in/yaml.v3"
|
|
"io/ioutil"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
var omType = g.InitOmType
|
|
var P = g.G.P
|
|
|
|
//var AgentServerInfoCache = ®ister.AgentServerInfo{}
|
|
|
|
func INIT(agentServerInfoConf string) chan bool {
|
|
|
|
// 获取系统的环境变量
|
|
agentServerInfo := parseAgentServerInfo(agentServerInfoConf)
|
|
|
|
// 缓存此内容
|
|
//AgentServerInfoCache = agentServerInfo
|
|
|
|
agentConfig := g.G.AgentConfig
|
|
|
|
initToServerProp := &rabbitmq.ConnectProperty{
|
|
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
|
|
QueueName: agentConfig.GetString("octopus.message.init_to_server"),
|
|
ExchangeType: g.QueueDirect,
|
|
TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"),
|
|
}
|
|
|
|
initFromServerProp := &rabbitmq.ConnectProperty{
|
|
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
|
|
QueueName: agentConfig.GetString("octopus.message.init_from_server"),
|
|
ExchangeType: g.QueueDirect,
|
|
TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"),
|
|
}
|
|
|
|
// 建立RabbitMQ的连接
|
|
initToServerQueue := &rabbitmq.RabbitQueue{
|
|
RabbitProp: initToServerProp,
|
|
}
|
|
//defer initToServerQueue.Close()
|
|
|
|
// 建立连接
|
|
initToServerQueue.Connect()
|
|
|
|
// 组装OctopusMessage
|
|
var octopusMsg *rabbitmq.OctopusMessage
|
|
octopusMsg = octopusMsg.Build(
|
|
omType,
|
|
agentServerInfo,
|
|
)
|
|
msgBytes, err := json.Marshal(octopusMsg)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
|
|
}
|
|
|
|
// 发送OM至MQ中
|
|
P.Submit(
|
|
func() {
|
|
for g.G.AgentHasRegister == false {
|
|
|
|
log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes)))
|
|
|
|
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
|
|
initToServerQueue.Send(
|
|
msgBytes,
|
|
)
|
|
// 休眠
|
|
time.Sleep(10 * time.Minute)
|
|
|
|
}
|
|
})
|
|
|
|
// 监听初始化连接中的信息
|
|
initFromServerQueue := &rabbitmq.RabbitQueue{
|
|
RabbitProp: initFromServerProp,
|
|
}
|
|
//defer initFromServerQueue.Close()
|
|
|
|
// 建立连接
|
|
initFromServerQueue.Connect()
|
|
|
|
// 建立运行时RabbitMQ连接
|
|
businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo)
|
|
|
|
return businessForeverChan
|
|
}
|
|
|
|
// handleInitMsgFromServer 处理从Server接收的 注册信息
|
|
func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool {
|
|
|
|
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
|
|
|
|
// 2023年6月19日 修复注册信息一直没有完全消费的问题
|
|
findRealAgentTopicName := ""
|
|
|
|
// 同步很多抢占注册的情况
|
|
for delivery := range initOctopusMessageDeliveries {
|
|
|
|
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
|
|
|
|
var initOctopusMsg *rabbitmq.OctopusMessage
|
|
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
|
|
string(delivery.Body)))
|
|
}
|
|
|
|
var serverInfo register.AgentServerInfo
|
|
|
|
s, er := initOctopusMsg.Content.(string)
|
|
if !er {
|
|
log.ErrorF("convet to string error! => %v", er)
|
|
}
|
|
cc := json.Unmarshal([]byte(s), &serverInfo)
|
|
if cc != nil {
|
|
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
|
|
}
|
|
serverName := serverInfo.ServerName
|
|
|
|
// 处理OM信息
|
|
if initOctopusMsg != nil && strings.HasPrefix(initOctopusMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) {
|
|
// 是本机的注册回复信息
|
|
log.InfoF("OctopusMessage INIT from server is this agent !")
|
|
|
|
// 手动确认信息
|
|
delivery.Ack(false)
|
|
|
|
// 修改系统参数
|
|
g.G.AgentHasRegister = true
|
|
|
|
// 保存真实的AgentTopicName
|
|
findRealAgentTopicName = serverInfo.TopicName
|
|
|
|
// 手动关闭 注册队列的连接
|
|
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
|
|
|
|
} else {
|
|
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
|
|
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", initOctopusMsg))
|
|
delivery.Ack(false)
|
|
|
|
// 需要休眠等待不再获取相应的信息
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
}
|
|
|
|
// 建立 运行时 RabbitMQ连接
|
|
runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName)
|
|
|
|
return runtimeConnectorQueue
|
|
}
|
|
|
|
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
|
|
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
|
|
|
|
initFromServerQueue.Close()
|
|
initToServerQueue.Close()
|
|
|
|
log.InfoF("Pretend to Shutdown register queue connection !")
|
|
}
|
|
|
|
func parseAgentServerInfo(agentServerInfoConf string) *register.AgentServerInfo {
|
|
|
|
// 约定文件地址为 /octopus-agent/octopus-agent.conf
|
|
var agentServerInfo *register.AgentServerInfo
|
|
yamlFile, err := ioutil.ReadFile(agentServerInfoConf)
|
|
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed to read YAML file: %v", err))
|
|
}
|
|
|
|
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
|
|
if err != nil {
|
|
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
|
|
}
|
|
|
|
// uniform agent server info
|
|
UniformAgentServerInfo(agentServerInfo)
|
|
|
|
// build operator cache
|
|
BuildAgentOsOperator(agentServerInfo)
|
|
|
|
jsonFormat, err := json.Marshal(&agentServerInfo)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
|
|
panic(err)
|
|
}
|
|
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
|
|
|
|
return agentServerInfo
|
|
}
|
|
|
|
// UniformAgentServerInfo uniform deal with ip
|
|
func UniformAgentServerInfo(agentServerInfo *register.AgentServerInfo) {
|
|
|
|
// reflect to iterator all field
|
|
log.Info("[Initialization] - UniformAgentServerInfo !")
|
|
value := reflect.ValueOf(agentServerInfo).Elem()
|
|
|
|
for i := 0; i < value.NumField(); i++ {
|
|
field := value.Field(i)
|
|
if field.Kind() == reflect.String && field.CanSet() {
|
|
field.SetString(strings.TrimSpace(field.String()))
|
|
}
|
|
}
|
|
|
|
log.Debug("[Initialization] - uniform ip address !")
|
|
if strings.Contains(agentServerInfo.ServerIPInV4, "/") {
|
|
agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0]
|
|
}
|
|
|
|
if strings.Contains(agentServerInfo.ServerIPInV6, "/") {
|
|
agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0]
|
|
}
|
|
|
|
}
|
|
|
|
func BuildAgentOsOperator(agentServerInfo *register.AgentServerInfo) {
|
|
|
|
// 2023年8月4日 passthrough some key information
|
|
ossOfflinePrefix := g.G.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix")
|
|
if !strings.HasSuffix(ossOfflinePrefix, "/") {
|
|
ossOfflinePrefix += "/"
|
|
}
|
|
|
|
// call the init exec function
|
|
agentOsOperator := executor.BuildAgentOsOperator(agentServerInfo.OSInfo, ossOfflinePrefix)
|
|
|
|
// assign the agentServerInfo
|
|
agentOsOperator.AgentServerInfo = agentServerInfo
|
|
|
|
// debug
|
|
marshal, _ := json.Marshal(agentOsOperator)
|
|
log.DebugF("[Agent INIT] cached agent operator is %s", marshal)
|
|
}
|