From 1b88e5d87157284c62609ac98113a65d7a9c85e6 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 29 Mar 2024 15:37:06 +0800 Subject: [PATCH] [Cmii][ImageSync] - reformat agent-go - 2 --- agent-go/a_agent/AgentHandler.go | 4 +++- agent-go/a_executor/CommandExecutor.go | 2 ++ agent-go/a_init/AgentInitialization.go | 9 ++------- agent-go/a_status/Status.go | 4 +++- agent-go/rabbitmq/OctopusMessageHandler.go | 2 -- agent-go/rabbitmq/RabbitMsgQueue.go | 4 ++++ 6 files changed, 14 insertions(+), 11 deletions(-) diff --git a/agent-go/a_agent/AgentHandler.go b/agent-go/a_agent/AgentHandler.go index e0600ba..617f505 100644 --- a/agent-go/a_agent/AgentHandler.go +++ b/agent-go/a_agent/AgentHandler.go @@ -1,6 +1,7 @@ package a_agent import ( + "time" "wdd.io/agent-common/logger" "wdd.io/agent-go/rabbitmq" ) @@ -11,7 +12,7 @@ func Activate() { log.Info("Module [ AGENT ] activated !") for { - if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { + if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.AgentRChan; ok { // 处理数 // 输出日志 @@ -21,5 +22,6 @@ func Activate() { log.ErrorF("business queue [ AGENT ] receive chan has closed !") break } + time.Sleep(time.Second * 20) } } diff --git a/agent-go/a_executor/CommandExecutor.go b/agent-go/a_executor/CommandExecutor.go index 8308029..c03a853 100644 --- a/agent-go/a_executor/CommandExecutor.go +++ b/agent-go/a_executor/CommandExecutor.go @@ -8,6 +8,7 @@ import ( "os/exec" "strconv" "strings" + "time" "wdd.io/agent-common/logger" "wdd.io/agent-common/utils" "wdd.io/agent-go/rabbitmq" @@ -70,6 +71,7 @@ func Activate() { break } } + time.Sleep(time.Second * 5) } func Execute(em *ExecutionMessage) (bool, []string) { diff --git a/agent-go/a_init/AgentInitialization.go b/agent-go/a_init/AgentInitialization.go index 8302a8a..5ed73a3 100644 --- a/agent-go/a_init/AgentInitialization.go +++ b/agent-go/a_init/AgentInitialization.go @@ -96,9 +96,6 @@ func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bo } } - //<-initForeverHandle - //close(initFromServerQueue.ReceiveChan.InitRChan) - // 建立 运行时 RabbitMQ连接 runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) @@ -153,6 +150,7 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { // cache it rabbitmq.BusinessRuntimeQueue = octopusMsgQueue + rabbitmq.OctopusToServerQueue = octopusToServerQueue return businessForeverChan } @@ -161,12 +159,9 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { func activatedOctopusAgentModules() { // Agent - err := P.Submit(func() { + _ = P.Submit(func() { a_agent.Activate() }) - if err != nil { - return - } // Executor _ = P.Submit(func() { a_executor.Activate() diff --git a/agent-go/a_status/Status.go b/agent-go/a_status/Status.go index 36f0786..5ebb461 100644 --- a/agent-go/a_status/Status.go +++ b/agent-go/a_status/Status.go @@ -48,7 +48,7 @@ func Activate() { // 死循环获取channel中的数据 for { - if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { + if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.StatusRChan; ok { // 处理数据 statusMsgString := octopusMessage.Content.(string) @@ -90,6 +90,8 @@ func Activate() { log.ErrorF("business queue [ STATUS ] receive chan has closed !") break } + + time.Sleep(time.Second * 20) } } diff --git a/agent-go/rabbitmq/OctopusMessageHandler.go b/agent-go/rabbitmq/OctopusMessageHandler.go index 2a22051..4c88075 100644 --- a/agent-go/rabbitmq/OctopusMessageHandler.go +++ b/agent-go/rabbitmq/OctopusMessageHandler.go @@ -5,8 +5,6 @@ import ( "wdd.io/agent-go/g" ) -var OctopusToServerQueue = &RabbitQueue{} - var P = g.G.P type OctopusMsgHandler interface { diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index 5ebd499..5b51170 100644 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -9,8 +9,12 @@ import ( "wdd.io/agent-common/logger" ) +// BusinessRuntimeQueue octopus message from server var BusinessRuntimeQueue = &RabbitQueue{} +// OctopusToServerQueue octopus message to server +var OctopusToServerQueue = &RabbitQueue{} + type RabbitMQ interface { RabbitSendWriter RabbitConnectCloser