[Excution] - base function start - 3
This commit is contained in:
@@ -47,7 +47,7 @@ func INIT() *register.AgentServerInfo {
|
||||
initToServerQueue.Connect()
|
||||
|
||||
// 组装OctopusMessage
|
||||
var octopusMsg *OctopusMessage
|
||||
var octopusMsg *rabbitmq.OctopusMessage
|
||||
octopusMsg = octopusMsg.Build(
|
||||
omType,
|
||||
agentServerInfo,
|
||||
@@ -103,7 +103,7 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe
|
||||
|
||||
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
|
||||
|
||||
var initOctopusMsg *OctopusMessage
|
||||
var initOctopusMsg *rabbitmq.OctopusMessage
|
||||
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"agent-go"
|
||||
"agent-go/g"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -9,6 +8,8 @@ import (
|
||||
|
||||
var OctopusToServerQueue = &RabbitQueue{}
|
||||
|
||||
var P = g.G.P
|
||||
|
||||
func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
|
||||
|
||||
// 建立 业务消息 接收队列
|
||||
@@ -51,12 +52,12 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
|
||||
|
||||
deliveries := octopusMsgQueue.Read(true)
|
||||
forever := make(chan bool)
|
||||
main.P.Submit(
|
||||
P.Submit(
|
||||
func() {
|
||||
// 死循环,处理Octopus Message
|
||||
for delivery := range deliveries {
|
||||
|
||||
var om *main.OctopusMessage
|
||||
var om *OctopusMessage
|
||||
err := json.Unmarshal(delivery.Body, &om)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
|
||||
@@ -65,7 +66,7 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
|
||||
}
|
||||
|
||||
// 策略模式 处理消息
|
||||
main.P.Submit(func() {
|
||||
P.Submit(func() {
|
||||
om.Handle()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package main
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"agent-go/executor"
|
||||
"agent-go/g"
|
||||
"agent-go/rabbitmq"
|
||||
"agent-go/status"
|
||||
"agent-go/utils"
|
||||
"encoding/json"
|
||||
@@ -22,7 +21,7 @@ type OctopusMsgHandler interface {
|
||||
}
|
||||
|
||||
type OctopusMsgSender interface {
|
||||
Send(rabbitQueue *rabbitmq.RabbitQueue, msg []byte)
|
||||
Send(rabbitQueue *RabbitQueue, msg []byte)
|
||||
}
|
||||
|
||||
type OctopusMsgBuilder interface {
|
||||
@@ -43,7 +42,7 @@ func (om *OctopusMessage) Handle() {
|
||||
doHandleOctopusMessage(om)
|
||||
}
|
||||
|
||||
func (om *OctopusMessage) Send(rabbitQueue *rabbitmq.RabbitQueue, msg []byte) {
|
||||
func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) {
|
||||
rabbitQueue.Send(msg)
|
||||
}
|
||||
|
||||
@@ -147,7 +146,7 @@ func statusOMHandler(octopusMessage *OctopusMessage) {
|
||||
octopusMessage.Result = statusRes
|
||||
// 发送回去
|
||||
statusOctopusReplayMessage, _ := json.Marshal(octopusMessage)
|
||||
rabbitmq.OctopusToServerQueue.Send(statusOctopusReplayMessage)
|
||||
OctopusToServerQueue.Send(statusOctopusReplayMessage)
|
||||
|
||||
// 输出日志
|
||||
log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes)
|
||||
Reference in New Issue
Block a user