[ Cmii ] [ Octopus ] - Image Function fix bugs

This commit is contained in:
zeaslity
2024-04-07 15:59:53 +08:00
parent 8e1304aac0
commit a9f25712eb
14 changed files with 492 additions and 510 deletions

View File

@@ -43,51 +43,29 @@ func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bo
a_agent.AgentServerInfoCache = agentServerInfo
// build for octopus tcp connect info struct
rabbitTCPConnectInfo := BuildOctopusTCPConnect(agentConfig)
rabbitTCPConnectInfo := buildOctopusTCPConnect(agentConfig)
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
QueueName: agentConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: agentConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: agentConfig.GetString("octopus.message.init_exchange"),
QueueName: agentConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: agentConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
//defer initToServerQueue.Close()
initToServerQueue, initFromServerQueue := buildOctopusInitQueue(rabbitTCPConnectInfo)
defer initToServerQueue.Close()
defer initFromServerQueue.Close()
// 建立连接
initToServerQueue.Connect()
// 监听初始化连接中的信息
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
//defer initFromServerQueue.Close()
// 建立连接
initFromServerQueue.Connect()
initFromServerQueue.Handle()
buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue)
// rabbit queue handle message from server
initFromServerQueue.Handle()
// receive from server
for g.G.AgentHasRegister == false {
select {
case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan:
if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) {
log.InfoF("[INIT] - agent has registered !")
break
}
default:
@@ -105,6 +83,36 @@ func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bo
return runtimeConnectorQueue
}
func buildOctopusInitQueue(rabbitTCPConnectInfo *rabbitmq.RabbitTCPConnectInfo) (*rabbitmq.RabbitQueue, *rabbitmq.RabbitQueue) {
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
return initToServerQueue, initFromServerQueue
}
func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
// 建立 业务消息 接收队列
@@ -183,19 +191,23 @@ func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initT
// 发送OM至MQ中
_ = P.Submit(
func() {
for g.G.AgentHasRegister == false {
agentRegisterCount := 0
for g.G.AgentHasRegister == false && agentRegisterCount < g.AgentRegisterFailedCount {
log.InfoF("Send init message to server! ==> %s", octopusMsg)
log.InfoF("[INIT] - Send init message to server! ==> %s", octopusMsg)
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
initToServerQueue.SendOMsg(octopusMsg)
agentRegisterCount += 1
// 休眠
time.Sleep(5 * time.Minute)
}
})
}
func BuildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo {
func buildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo {
host := agentConfig.GetString("spring.rabbitmq.host")
port := agentConfig.GetString("spring.rabbitmq.port")
username := agentConfig.GetString("spring.rabbitmq.username")
@@ -226,15 +238,13 @@ func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo
// handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool {
//initOctopusMessageDeliveries := initFromServerQueue.Read(false)
log.DebugF("message received from server is %s", initFromServerMsg)
var serverInfo a_agent.AgentServerInfo
s, er := initFromServerMsg.Content.(string)
if !er {
log.ErrorF("convet to string error! => %v", er)
log.ErrorF("convert to string error! => %v", er)
}
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
@@ -247,9 +257,6 @@ func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToS
// 是本机的注册回复信息
log.InfoF("OctopusMessage INIT from server is this agent !")
// 手动确认信息
//delivery.Ack(false)
// 修改系统参数
g.G.AgentHasRegister = true
@@ -261,13 +268,16 @@ func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToS
} else {
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ", &initFromServerMsg))
log.WarnF("OctopusMessage INIT from server not this agent ! => %v ", &initFromServerMsg)
// 需要休眠等待不再获取相应的信息
P.Submit(func() {
_ = P.Submit(func() {
time.Sleep(time.Second * 5)
initToServerQueue.SendOMsg(initFromServerMsg)
})
}
return g.G.AgentHasRegister