From f5a3db2f5606bf983621e51e53534f85b688f020 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Thu, 6 Jul 2023 15:49:14 +0800 Subject: [PATCH] =?UTF-8?q?[=20=E9=A1=B9=E7=9B=AE=20]=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E9=98=9F=E5=88=97=E7=AD=89=E5=BE=85=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .run/go build agent-go.run.xml | 3 ++- agent-go/AgentInitialization.go | 17 ++++++++++------- agent-go/go.mod | 1 - agent-go/main.go | 4 +++- agent-go/rabbitmq/OMsgConnector.go | 7 +++---- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/.run/go build agent-go.run.xml b/.run/go build agent-go.run.xml index 335f740..c29c0f4 100644 --- a/.run/go build agent-go.run.xml +++ b/.run/go build agent-go.run.xml @@ -3,7 +3,8 @@ factoryName="Go Application" nameIsGenerated="true"> - + diff --git a/agent-go/AgentInitialization.go b/agent-go/AgentInitialization.go index d4c8d13..1407fb7 100644 --- a/agent-go/AgentInitialization.go +++ b/agent-go/AgentInitialization.go @@ -15,13 +15,16 @@ import ( var omType = g.InitOmType var P = g.G.P -var AgentServerInfoCache = ®ister.AgentServerInfo{} +//var AgentServerInfoCache = ®ister.AgentServerInfo{} -func INIT(agentServerInfoConf string) *register.AgentServerInfo { +func INIT(agentServerInfoConf string) chan bool { // 获取系统的环境变量 agentServerInfo := parseAgentServerInfo(agentServerInfoConf) + // 缓存此内容 + //AgentServerInfoCache = agentServerInfo + agentConfig := g.G.AgentConfig initToServerProp := &rabbitmq.ConnectProperty{ @@ -73,7 +76,6 @@ func INIT(agentServerInfoConf string) *register.AgentServerInfo { time.Sleep(10 * time.Minute) } - }) // 监听初始化连接中的信息 @@ -86,13 +88,13 @@ func INIT(agentServerInfoConf string) *register.AgentServerInfo { initFromServerQueue.Connect() // 建立运行时RabbitMQ连接 - handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) + businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) - return agentServerInfo + return businessForeverChan } // handleInitMsgFromServer 处理从Server接收的 注册信息 -func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) { +func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool { initOctopusMessageDeliveries := initFromServerQueue.Read(false) @@ -151,8 +153,9 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe } // 建立 运行时 RabbitMQ连接 - rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName) + runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName) + return runtimeConnectorQueue } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 diff --git a/agent-go/go.mod b/agent-go/go.mod index 61aedec..4a830fc 100644 --- a/agent-go/go.mod +++ b/agent-go/go.mod @@ -3,7 +3,6 @@ go 1.18 require ( - github.com/nacos-group/nacos-sdk-go/v2 v2.2.0 github.com/panjf2000/ants/v2 v2.7.2 github.com/shirou/gopsutil/v3 v3.23.3 github.com/spf13/viper v1.15.0 diff --git a/agent-go/main.go b/agent-go/main.go index f74b36d..85c3ba2 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -27,6 +27,8 @@ func main() { g.G.AgentConfig = register.ParseConfiguration(filename) // 执行初始化之策工作 - AgentServerInfoCache = INIT(agentServerInfoConf) + businessForeverChan := INIT(agentServerInfoConf) + // 永远等待 runtime的队列消息 + <-businessForeverChan } diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go index 89bf16d..3385169 100644 --- a/agent-go/rabbitmq/OMsgConnector.go +++ b/agent-go/rabbitmq/OMsgConnector.go @@ -10,7 +10,7 @@ var OctopusToServerQueue = &RabbitQueue{} var P = g.G.P -func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { +func BuildOMsgRuntimeConnectorQueue(agentTopicName string) chan bool { // 建立 业务消息 接收队列 // agentTopicName为名称的队列 @@ -51,7 +51,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue) deliveries := octopusMsgQueue.Read(true) - forever := make(chan bool) + businessForeverChan := make(chan bool) P.Submit( func() { // 死循环,处理Octopus Message @@ -72,7 +72,6 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { } }) - // wait forever - <-forever + return businessForeverChan }