From 1d32d7dd04b1f4d3fdc71f8b5b820318721298f1 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 29 Mar 2024 14:46:11 +0800 Subject: [PATCH] [Cmii][ImageSync] - reformat agent-go - 2 --- agent-go/a_init/AgentInitialization.go | 24 +++++----- agent-go/{ => a_init}/BastionInitializaion.go | 7 ++- agent-go/main.go | 2 +- agent-go/octopus-agent-dev.yaml | 2 +- agent-go/rabbitmq/OctopusMessageHandler.go | 2 +- agent-go/rabbitmq/RabbitMsgQueue.go | 44 ++++++++++--------- 6 files changed, 42 insertions(+), 39 deletions(-) rename agent-go/{ => a_init}/BastionInitializaion.go (88%) diff --git a/agent-go/a_init/AgentInitialization.go b/agent-go/a_init/AgentInitialization.go index b9d767f..3cabe3e 100644 --- a/agent-go/a_init/AgentInitialization.go +++ b/agent-go/a_init/AgentInitialization.go @@ -73,31 +73,33 @@ func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bo // 监听初始化连接中的信息 initFromServerQueue := &rabbitmq.RabbitQueue{ - RabbitProp: initFromServerProp, + RabbitProp: initFromServerProp, + RabbitConnectInfo: rabbitTCPConnectInfo, } //defer initFromServerQueue.Close() // 建立连接 initFromServerQueue.Connect() - initForeverHandle := initFromServerQueue.Handle() + initFromServerQueue.Handle() buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue) // receive from server for g.G.AgentHasRegister == false { select { - case <-initFromServerQueue.ReceiveChan.InitRChan: - initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan - handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) + case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan: + if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) { + break + } default: - //log.Debug("") + log.Debug("agent init not received from server ! start to waiting !") time.Sleep(time.Second * 10) } } - <-initForeverHandle - close(initFromServerQueue.ReceiveChan.InitRChan) + //<-initForeverHandle + //close(initFromServerQueue.ReceiveChan.InitRChan) // 建立 运行时 RabbitMQ连接 runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName) @@ -194,7 +196,7 @@ func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initT }) } -func BuildOctopusTCPConnect(agentConfig *viper.Viper) rabbitmq.RabbitTCPConnectInfo { +func BuildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo { host := agentConfig.GetString("spring.rabbitmq.host") port := agentConfig.GetString("spring.rabbitmq.port") username := agentConfig.GetString("spring.rabbitmq.username") @@ -202,7 +204,7 @@ func BuildOctopusTCPConnect(agentConfig *viper.Viper) rabbitmq.RabbitTCPConnectI virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host") //todo - return rabbitmq.RabbitTCPConnectInfo{ + return &rabbitmq.RabbitTCPConnectInfo{ UserName: username, Password: password, Host: host, @@ -227,7 +229,7 @@ func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToS //initOctopusMessageDeliveries := initFromServerQueue.Read(false) - log.Debug(fmt.Sprintf("message received from server is %s", &initFromServerMsg)) + log.DebugF("message received from server is %s", initFromServerMsg) var serverInfo a_agent.AgentServerInfo diff --git a/agent-go/BastionInitializaion.go b/agent-go/a_init/BastionInitializaion.go similarity index 88% rename from agent-go/BastionInitializaion.go rename to agent-go/a_init/BastionInitializaion.go index 69e9fad..66c2990 100644 --- a/agent-go/BastionInitializaion.go +++ b/agent-go/a_init/BastionInitializaion.go @@ -1,9 +1,8 @@ -package main +package a_init import ( "wdd.io/agent-go/a_agent" "wdd.io/agent-go/a_executor" - "wdd.io/agent-go/a_init" "wdd.io/agent-go/a_status" ) @@ -42,8 +41,8 @@ func BastionModeInit() { // re-get agentInfo from status module agentInfo := a_status.ReportAgentInfo() - a_init.refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) - a_init.BuildAgentOsOperator(agentInfo, agentServerInfo) + refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo) + BuildAgentOsOperator(agentInfo, agentServerInfo) // install docker agentOsOperator := a_executor.AgentOsOperatorCache diff --git a/agent-go/main.go b/agent-go/main.go index 498fd7b..dfcfd2c 100644 --- a/agent-go/main.go +++ b/agent-go/main.go @@ -21,7 +21,7 @@ func main() { flag.Parse() if mode == "bastion" { - BastionModeInit() + a_init.BastionModeInit() return } diff --git a/agent-go/octopus-agent-dev.yaml b/agent-go/octopus-agent-dev.yaml index 1b683bc..0a9af19 100644 --- a/agent-go/octopus-agent-dev.yaml +++ b/agent-go/octopus-agent-dev.yaml @@ -52,7 +52,7 @@ spring: allow-bean-definition-overriding: true rabbitmq: # host: 42.192.52.227 - host: 192.168.35.71 + host: 10.250.0.100 port: 20672 username: boge password: boge8tingH diff --git a/agent-go/rabbitmq/OctopusMessageHandler.go b/agent-go/rabbitmq/OctopusMessageHandler.go index 816612f..2a22051 100644 --- a/agent-go/rabbitmq/OctopusMessageHandler.go +++ b/agent-go/rabbitmq/OctopusMessageHandler.go @@ -15,7 +15,7 @@ type OctopusMsgHandler interface { func (om *OctopusMessage) HandleMsg(rChan *RabbitReceiveChan) { // 实际执行 OM handle进程 - log.Debug("接收到OctopusMessage, 开始处理!") + log.DebugF("接收到OctopusMessage => %s, 开始处理!", om.OctopusMessageType) doHandleOctopusMessage(om, rChan) } diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index 82e8c72..5075564 100644 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -37,7 +37,7 @@ type RabbitQueue struct { // 连接属性 RabbitProp *ConnectProperty // 底层连接tcp信息 - RabbitConnectInfo RabbitTCPConnectInfo + RabbitConnectInfo *RabbitTCPConnectInfo // 返回消息队列 ReceiveChan *RabbitReceiveChan } @@ -79,7 +79,7 @@ var instance *amqp.Connection var once sync.Once // 初始化 Singleton 实例的函数 -func createInstance(rabbitConnectInfo RabbitTCPConnectInfo) func() { +func createInstance(rabbitConnectInfo *RabbitTCPConnectInfo) { // 在这里进行 Singleton 的初始化操作 // 获取RabbitMQ的连接地址 @@ -92,15 +92,16 @@ func createInstance(rabbitConnectInfo RabbitTCPConnectInfo) func() { } instance = connection - return nil } // GetInstance 获取全局唯一的 Singleton 实例的函数 -func GetInstance(rabbitConnectInfo RabbitTCPConnectInfo) *amqp.Connection { +func GetInstance(rabbitConnectInfo *RabbitTCPConnectInfo) *amqp.Connection { // 使用 sync.Once 确保 createInstance 只会被调用一次 // todo 理解 - once.Do(createInstance(rabbitConnectInfo)) + once.Do(func() { + createInstance(rabbitConnectInfo) + }) return instance } @@ -185,24 +186,25 @@ func (r *RabbitQueue) Close() error { func (r *RabbitQueue) Handle() chan bool { deliveries := r.Read(true) - forverHandle := make(chan bool) + foreverHandle := make(chan bool) // 死循环,处理Octopus Message - for delivery := range deliveries { + P.Submit(func() { + for delivery := range deliveries { - var om *OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) - // 保存到某处 - continue + var om *OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) + // 保存到某处 + continue + } + // 策略模式 处理消息 + P.Submit(func() { + om.HandleMsg(r.ReceiveChan) + }) } - // 策略模式 处理消息 - P.Submit(func() { - om.HandleMsg(r.ReceiveChan) - }) - } - - return forverHandle + }) + return foreverHandle } // Send 向RabbitMQ中发送消息 @@ -257,7 +259,7 @@ func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery { return msgs } -func parseRabbitMQEndpoint(rabbitConnectInfo RabbitTCPConnectInfo) string { +func parseRabbitMQEndpoint(rabbitConnectInfo *RabbitTCPConnectInfo) string { var res strings.Builder