[ 项目 ] 优化业务队列等待逻辑

This commit is contained in:
zeaslity
2023-07-06 15:49:14 +08:00
parent 129f388cfa
commit f5a3db2f56
5 changed files with 18 additions and 14 deletions

View File

@@ -3,7 +3,8 @@
factoryName="Go Application" nameIsGenerated="true"> factoryName="Go Application" nameIsGenerated="true">
<module name="ProjectOctopus"/> <module name="ProjectOctopus"/>
<working_directory value="$PROJECT_DIR$/agent-go"/> <working_directory value="$PROJECT_DIR$/agent-go"/>
<parameters value="-version=dev"/> <parameters
value="-version=dev -agentServerInfoConf=C:\Users\wdd\IdeaProjects\ProjectOctopus\agent-go\octopus-agent-dev.yaml"/>
<kind value="PACKAGE"/> <kind value="PACKAGE"/>
<package value="agent-go"/> <package value="agent-go"/>
<directory value="$PROJECT_DIR$"/> <directory value="$PROJECT_DIR$"/>

View File

@@ -15,13 +15,16 @@ import (
var omType = g.InitOmType var omType = g.InitOmType
var P = g.G.P var P = g.G.P
var AgentServerInfoCache = &register.AgentServerInfo{} //var AgentServerInfoCache = &register.AgentServerInfo{}
func INIT(agentServerInfoConf string) *register.AgentServerInfo { func INIT(agentServerInfoConf string) chan bool {
// 获取系统的环境变量 // 获取系统的环境变量
agentServerInfo := parseAgentServerInfo(agentServerInfoConf) agentServerInfo := parseAgentServerInfo(agentServerInfoConf)
// 缓存此内容
//AgentServerInfoCache = agentServerInfo
agentConfig := g.G.AgentConfig agentConfig := g.G.AgentConfig
initToServerProp := &rabbitmq.ConnectProperty{ initToServerProp := &rabbitmq.ConnectProperty{
@@ -73,7 +76,6 @@ func INIT(agentServerInfoConf string) *register.AgentServerInfo {
time.Sleep(10 * time.Minute) time.Sleep(10 * time.Minute)
} }
}) })
// 监听初始化连接中的信息 // 监听初始化连接中的信息
@@ -86,13 +88,13 @@ func INIT(agentServerInfoConf string) *register.AgentServerInfo {
initFromServerQueue.Connect() initFromServerQueue.Connect()
// 建立运行时RabbitMQ连接 // 建立运行时RabbitMQ连接
handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo) businessForeverChan := handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo)
return agentServerInfo return businessForeverChan
} }
// handleInitMsgFromServer 处理从Server接收的 注册信息 // handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) { func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *register.AgentServerInfo) chan bool {
initOctopusMessageDeliveries := initFromServerQueue.Read(false) initOctopusMessageDeliveries := initFromServerQueue.Read(false)
@@ -151,8 +153,9 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe
} }
// 建立 运行时 RabbitMQ连接 // 建立 运行时 RabbitMQ连接
rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName) runtimeConnectorQueue := rabbitmq.BuildOMsgRuntimeConnectorQueue(findRealAgentTopicName)
return runtimeConnectorQueue
} }
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 // shutdownRegisterQueueConnection 关闭初始化连接的两个队列

View File

@@ -3,7 +3,6 @@
go 1.18 go 1.18
require ( require (
github.com/nacos-group/nacos-sdk-go/v2 v2.2.0
github.com/panjf2000/ants/v2 v2.7.2 github.com/panjf2000/ants/v2 v2.7.2
github.com/shirou/gopsutil/v3 v3.23.3 github.com/shirou/gopsutil/v3 v3.23.3
github.com/spf13/viper v1.15.0 github.com/spf13/viper v1.15.0

View File

@@ -27,6 +27,8 @@ func main() {
g.G.AgentConfig = register.ParseConfiguration(filename) g.G.AgentConfig = register.ParseConfiguration(filename)
// 执行初始化之策工作 // 执行初始化之策工作
AgentServerInfoCache = INIT(agentServerInfoConf) businessForeverChan := INIT(agentServerInfoConf)
// 永远等待 runtime的队列消息
<-businessForeverChan
} }

View File

@@ -10,7 +10,7 @@ var OctopusToServerQueue = &RabbitQueue{}
var P = g.G.P var P = g.G.P
func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { func BuildOMsgRuntimeConnectorQueue(agentTopicName string) chan bool {
// 建立 业务消息 接收队列 // 建立 业务消息 接收队列
// agentTopicName为名称的队列 // agentTopicName为名称的队列
@@ -51,7 +51,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue) log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue)
deliveries := octopusMsgQueue.Read(true) deliveries := octopusMsgQueue.Read(true)
forever := make(chan bool) businessForeverChan := make(chan bool)
P.Submit( P.Submit(
func() { func() {
// 死循环处理Octopus Message // 死循环处理Octopus Message
@@ -72,7 +72,6 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
} }
}) })
// wait forever return businessForeverChan
<-forever
} }