From 80198930c4d340ca1c566b8483ad0c6d69025cba Mon Sep 17 00:00:00 2001 From: IceDerce Date: Sun, 25 Jun 2023 11:31:27 +0800 Subject: [PATCH] [Excution] - base function start - 3 --- agent-go/AgentInitialization.go | 4 ++-- agent-go/rabbitmq/OMsgConnector.go | 9 +++++---- agent-go/{ => rabbitmq}/OctopusMessage.go | 9 ++++----- 3 files changed, 11 insertions(+), 11 deletions(-) rename agent-go/{ => rabbitmq}/OctopusMessage.go (93%) diff --git a/agent-go/AgentInitialization.go b/agent-go/AgentInitialization.go index 40c82e6..b92e3a0 100644 --- a/agent-go/AgentInitialization.go +++ b/agent-go/AgentInitialization.go @@ -47,7 +47,7 @@ func INIT() *register.AgentServerInfo { initToServerQueue.Connect() // 组装OctopusMessage - var octopusMsg *OctopusMessage + var octopusMsg *rabbitmq.OctopusMessage octopusMsg = octopusMsg.Build( omType, agentServerInfo, @@ -103,7 +103,7 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) - var initOctopusMsg *OctopusMessage + var initOctopusMsg *rabbitmq.OctopusMessage err := json.Unmarshal(delivery.Body, &initOctopusMsg) if err != nil { log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go index cd72e29..89bf16d 100644 --- a/agent-go/rabbitmq/OMsgConnector.go +++ b/agent-go/rabbitmq/OMsgConnector.go @@ -1,7 +1,6 @@ package rabbitmq import ( - "agent-go" "agent-go/g" "encoding/json" "fmt" @@ -9,6 +8,8 @@ import ( var OctopusToServerQueue = &RabbitQueue{} +var P = g.G.P + func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { // 建立 业务消息 接收队列 @@ -51,12 +52,12 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { deliveries := octopusMsgQueue.Read(true) forever := make(chan bool) - main.P.Submit( + P.Submit( func() { // 死循环,处理Octopus Message for delivery := range deliveries { - var om *main.OctopusMessage + var om *OctopusMessage err := json.Unmarshal(delivery.Body, &om) if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) @@ -65,7 +66,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { } // 策略模式 处理消息 - main.P.Submit(func() { + P.Submit(func() { om.Handle() }) } diff --git a/agent-go/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go similarity index 93% rename from agent-go/OctopusMessage.go rename to agent-go/rabbitmq/OctopusMessage.go index 045fa60..51a2f67 100644 --- a/agent-go/OctopusMessage.go +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -1,9 +1,8 @@ -package main +package rabbitmq import ( "agent-go/executor" "agent-go/g" - "agent-go/rabbitmq" "agent-go/status" "agent-go/utils" "encoding/json" @@ -22,7 +21,7 @@ type OctopusMsgHandler interface { } type OctopusMsgSender interface { - Send(rabbitQueue *rabbitmq.RabbitQueue, msg []byte) + Send(rabbitQueue *RabbitQueue, msg []byte) } type OctopusMsgBuilder interface { @@ -43,7 +42,7 @@ func (om *OctopusMessage) Handle() { doHandleOctopusMessage(om) } -func (om *OctopusMessage) Send(rabbitQueue *rabbitmq.RabbitQueue, msg []byte) { +func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) { rabbitQueue.Send(msg) } @@ -147,7 +146,7 @@ func statusOMHandler(octopusMessage *OctopusMessage) { octopusMessage.Result = statusRes // 发送回去 statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) - rabbitmq.OctopusToServerQueue.Send(statusOctopusReplayMessage) + OctopusToServerQueue.Send(statusOctopusReplayMessage) // 输出日志 log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes)