[Cmii][ImageSync] - reformat agent-go - 2

This commit is contained in:
zeaslity
2024-03-29 15:37:06 +08:00
parent 77c689abd7
commit 1b88e5d871
6 changed files with 14 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
package a_agent package a_agent
import ( import (
"time"
"wdd.io/agent-common/logger" "wdd.io/agent-common/logger"
"wdd.io/agent-go/rabbitmq" "wdd.io/agent-go/rabbitmq"
) )
@@ -11,7 +12,7 @@ func Activate() {
log.Info("Module [ AGENT ] activated !") log.Info("Module [ AGENT ] activated !")
for { for {
if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.AgentRChan; ok {
// 处理数 // 处理数
// 输出日志 // 输出日志
@@ -21,5 +22,6 @@ func Activate() {
log.ErrorF("business queue [ AGENT ] receive chan has closed !") log.ErrorF("business queue [ AGENT ] receive chan has closed !")
break break
} }
time.Sleep(time.Second * 20)
} }
} }

View File

@@ -8,6 +8,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"time"
"wdd.io/agent-common/logger" "wdd.io/agent-common/logger"
"wdd.io/agent-common/utils" "wdd.io/agent-common/utils"
"wdd.io/agent-go/rabbitmq" "wdd.io/agent-go/rabbitmq"
@@ -70,6 +71,7 @@ func Activate() {
break break
} }
} }
time.Sleep(time.Second * 5)
} }
func Execute(em *ExecutionMessage) (bool, []string) { func Execute(em *ExecutionMessage) (bool, []string) {

View File

@@ -96,9 +96,6 @@ func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bo
} }
} }
//<-initForeverHandle
//close(initFromServerQueue.ReceiveChan.InitRChan)
// 建立 运行时 RabbitMQ连接 // 建立 运行时 RabbitMQ连接
runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName)
@@ -153,6 +150,7 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
// cache it // cache it
rabbitmq.BusinessRuntimeQueue = octopusMsgQueue rabbitmq.BusinessRuntimeQueue = octopusMsgQueue
rabbitmq.OctopusToServerQueue = octopusToServerQueue
return businessForeverChan return businessForeverChan
} }
@@ -161,12 +159,9 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
func activatedOctopusAgentModules() { func activatedOctopusAgentModules() {
// Agent // Agent
err := P.Submit(func() { _ = P.Submit(func() {
a_agent.Activate() a_agent.Activate()
}) })
if err != nil {
return
}
// Executor // Executor
_ = P.Submit(func() { _ = P.Submit(func() {
a_executor.Activate() a_executor.Activate()

View File

@@ -48,7 +48,7 @@ func Activate() {
// 死循环获取channel中的数据 // 死循环获取channel中的数据
for { for {
if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.StatusRChan; ok {
// 处理数据 // 处理数据
statusMsgString := octopusMessage.Content.(string) statusMsgString := octopusMessage.Content.(string)
@@ -90,6 +90,8 @@ func Activate() {
log.ErrorF("business queue [ STATUS ] receive chan has closed !") log.ErrorF("business queue [ STATUS ] receive chan has closed !")
break break
} }
time.Sleep(time.Second * 20)
} }
} }

View File

@@ -5,8 +5,6 @@ import (
"wdd.io/agent-go/g" "wdd.io/agent-go/g"
) )
var OctopusToServerQueue = &RabbitQueue{}
var P = g.G.P var P = g.G.P
type OctopusMsgHandler interface { type OctopusMsgHandler interface {

View File

@@ -9,8 +9,12 @@ import (
"wdd.io/agent-common/logger" "wdd.io/agent-common/logger"
) )
// BusinessRuntimeQueue octopus message from server
var BusinessRuntimeQueue = &RabbitQueue{} var BusinessRuntimeQueue = &RabbitQueue{}
// OctopusToServerQueue octopus message to server
var OctopusToServerQueue = &RabbitQueue{}
type RabbitMQ interface { type RabbitMQ interface {
RabbitSendWriter RabbitSendWriter
RabbitConnectCloser RabbitConnectCloser