[Excution] - base function start - 3
This commit is contained in:
188
agent-go/AgentInitialization.go
Normal file
188
agent-go/AgentInitialization.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"agent-go/g"
|
||||
"agent-go/rabbitmq"
|
||||
"agent-go/register"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
)
|
||||
|
||||
var omType = g.InitOmType
|
||||
var P = g.G.P
|
||||
|
||||
var AgentServerInfoCache = ®ister.AgentServerInfo{}
|
||||
|
||||
func INIT() *register.AgentServerInfo {
|
||||
|
||||
// 获取系统的环境变量
|
||||
agentServerInfo := parseAgentServerInfo()
|
||||
|
||||
agentConfig := g.G.AgentConfig
|
||||
|
||||
initToServerProp := &rabbitmq.ConnectProperty{
|
||||
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
|
||||
QueueName: agentConfig.GetString("octopus.message.init_to_server"),
|
||||
ExchangeType: g.QueueDirect,
|
||||
TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"),
|
||||
}
|
||||
|
||||
initFromServerProp := &rabbitmq.ConnectProperty{
|
||||
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
|
||||
QueueName: agentConfig.GetString("octopus.message.init_from_server"),
|
||||
ExchangeType: g.QueueDirect,
|
||||
TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"),
|
||||
}
|
||||
|
||||
// 建立RabbitMQ的连接
|
||||
initToServerQueue := &rabbitmq.RabbitQueue{
|
||||
RabbitProp: initToServerProp,
|
||||
}
|
||||
defer initToServerQueue.Close()
|
||||
|
||||
// 建立连接
|
||||
initToServerQueue.Connect()
|
||||
|
||||
// 组装OctopusMessage
|
||||
var octopusMsg *OctopusMessage
|
||||
octopusMsg = octopusMsg.Build(
|
||||
omType,
|
||||
agentServerInfo,
|
||||
)
|
||||
msgBytes, err := json.Marshal(octopusMsg)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
|
||||
}
|
||||
|
||||
// 发送OM至MQ中
|
||||
P.Submit(
|
||||
func() {
|
||||
for g.G.AgentHasRegister == false {
|
||||
|
||||
log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes)))
|
||||
|
||||
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
|
||||
initToServerQueue.Send(
|
||||
msgBytes,
|
||||
)
|
||||
// 休眠
|
||||
time.Sleep(10 * time.Minute)
|
||||
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
// 监听初始化连接中的信息
|
||||
initFromServerQueue := &rabbitmq.RabbitQueue{
|
||||
RabbitProp: initFromServerProp,
|
||||
}
|
||||
defer initFromServerQueue.Close()
|
||||
|
||||
// 建立连接
|
||||
initFromServerQueue.Connect()
|
||||
|
||||
// 建立运行时RabbitMQ连接
|
||||
handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo)
|
||||
|
||||
return agentServerInfo
|
||||
}
|
||||
|
||||
// handleInitMsgFromServer 处理从Server接收的 注册信息
|
||||
func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) {
|
||||
|
||||
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
|
||||
|
||||
// 2023年6月19日 修复注册信息一直没有完全消费的问题
|
||||
findRealAgentTopicName := ""
|
||||
|
||||
// 同步很多抢占注册的情况
|
||||
for delivery := range initOctopusMessageDeliveries {
|
||||
|
||||
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
|
||||
|
||||
var initOctopusMsg *OctopusMessage
|
||||
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
|
||||
string(delivery.Body)))
|
||||
}
|
||||
|
||||
var serverInfo register.AgentServerInfo
|
||||
|
||||
s, _ := initOctopusMsg.Content.(string)
|
||||
cc := json.Unmarshal([]byte(s), &serverInfo)
|
||||
if cc != nil {
|
||||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
|
||||
}
|
||||
serverName := serverInfo.ServerName
|
||||
|
||||
// 处理OM信息
|
||||
if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
|
||||
// 是本机的注册回复信息
|
||||
log.InfoF("OctopusMessage INIT from server is this agent !")
|
||||
|
||||
// 手动确认信息
|
||||
delivery.Ack(false)
|
||||
|
||||
// 修改系统参数
|
||||
g.G.AgentHasRegister = true
|
||||
|
||||
// 保存真实的AgentTopicName
|
||||
findRealAgentTopicName = serverInfo.TopicName
|
||||
|
||||
// 手动关闭 注册队列的连接
|
||||
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
|
||||
|
||||
} else {
|
||||
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
|
||||
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body))
|
||||
delivery.Ack(false)
|
||||
|
||||
// 需要休眠等待不再获取相应的信息
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// 建立 运行时 RabbitMQ连接
|
||||
rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName)
|
||||
|
||||
}
|
||||
|
||||
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
|
||||
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
|
||||
|
||||
initFromServerQueue.Close()
|
||||
initToServerQueue.Close()
|
||||
|
||||
log.InfoF("Pretend to Shutdown register queue connection !")
|
||||
}
|
||||
|
||||
func parseAgentServerInfo() *register.AgentServerInfo {
|
||||
|
||||
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
|
||||
// 目前使用
|
||||
var agentServerInfo *register.AgentServerInfo
|
||||
//yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
|
||||
yamlFile, err := ioutil.ReadFile("server-env.yaml")
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to read YAML file: %v", err))
|
||||
}
|
||||
|
||||
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
|
||||
}
|
||||
|
||||
jsonFormat, err := json.Marshal(agentServerInfo)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
|
||||
panic(err)
|
||||
}
|
||||
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
|
||||
|
||||
return agentServerInfo
|
||||
}
|
||||
Reference in New Issue
Block a user