diff --git a/agent-go/a_agent/AgentHandler.go b/agent-go/a_agent/AgentHandler.go index 2a76295..e0600ba 100644 --- a/agent-go/a_agent/AgentHandler.go +++ b/agent-go/a_agent/AgentHandler.go @@ -1,9 +1,25 @@ package a_agent -import "wdd.io/agent-common/logger" +import ( + "wdd.io/agent-common/logger" + "wdd.io/agent-go/rabbitmq" +) var log = logger.Log func Activate() { log.Info("Module [ AGENT ] activated !") + + for { + if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { + // 处理数 + + // 输出日志 + log.InfoF("接收到查询Agent状态的请求,结果为 => %s", octopusMessage) + } else { + // channel已关闭,跳出循环 + log.ErrorF("business queue [ AGENT ] receive chan has closed !") + break + } + } } diff --git a/agent-go/a_executor/CommandExecutor.go b/agent-go/a_executor/CommandExecutor.go index 2041fc2..8308029 100644 --- a/agent-go/a_executor/CommandExecutor.go +++ b/agent-go/a_executor/CommandExecutor.go @@ -3,11 +3,14 @@ package a_executor import ( "bufio" "bytes" + "encoding/json" "fmt" "os/exec" "strconv" "strings" "wdd.io/agent-common/logger" + "wdd.io/agent-common/utils" + "wdd.io/agent-go/rabbitmq" ) type ExecutionMessage struct { @@ -28,35 +31,45 @@ var AgentOsOperatorCache = &AgentOsOperator{} func Activate() { log.Info("Module [ EXECUTOR ] activated !") - //// 转换类型 - //executionMsgString := octopusMessage.Content.(string) - // - //// 解析 ExecutionMessage - //var executionMessage *a_executor.ExecutionMessage - //err := json.Unmarshal([]byte(executionMsgString), &executionMessage) - //if err != nil { - // log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) - // return - //} - // - //// 执行命令 - //ok, resultLog := a_executor.Execute(executionMessage) - //if ok { - // octopusMessage.ResultCode = "200" - //} else { - // octopusMessage.ResultCode = "300" - //} - // - //// 返回结果 - //if executionMessage.NeedResultReplay { - // // send back the result log - // octopusMessage.Result = resultLog - //} - //// 返回时间 - //octopusMessage.ACTime = utils.ParseDateTimeTime() - // - //// 返回结果 - //octopusMessage.SendToOctopusServer() + // 死循环获取channel中的数据 + for { + if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { + // 处理数据 + // 转换类型 + executionMsgString := octopusMessage.Content.(string) + + //解析 ExecutionMessage + var executionMessage *ExecutionMessage + err := json.Unmarshal([]byte(executionMsgString), &executionMessage) + if err != nil { + log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) + return + } + + // 执行命令 + ok, resultLog := Execute(executionMessage) + if ok { + octopusMessage.ResultCode = "200" + } else { + octopusMessage.ResultCode = "300" + } + + // 返回结果 + if executionMessage.NeedResultReplay { + // send back the result log + octopusMessage.Result = resultLog + } + // 返回时间 + octopusMessage.ACTime = utils.ParseDateTimeTime() + + // 返回结果 + octopusMessage.SendToOctopusServer() + } else { + // channel已关闭,跳出循环 + log.ErrorF("business queue [ EXECUTOR ] receive chan has closed !") + break + } + } } func Execute(em *ExecutionMessage) (bool, []string) { diff --git a/agent-go/a_init/AgentInitialization.go b/agent-go/a_init/AgentInitialization.go index 3cabe3e..8302a8a 100644 --- a/agent-go/a_init/AgentInitialization.go +++ b/agent-go/a_init/AgentInitialization.go @@ -22,8 +22,6 @@ var P = g.G.P var log = logger.Log -//var AgentServerInfoCache = ®ister.AgentServerInfo{} - func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bool { // 获取系统的环境变量 @@ -153,6 +151,9 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { // 开始处理Runtime的OM消息 businessForeverChan := octopusMsgQueue.Handle() + // cache it + rabbitmq.BusinessRuntimeQueue = octopusMsgQueue + return businessForeverChan } @@ -160,11 +161,20 @@ func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool { func activatedOctopusAgentModules() { // Agent - a_agent.Activate() + err := P.Submit(func() { + a_agent.Activate() + }) + if err != nil { + return + } // Executor - a_executor.Activate() + _ = P.Submit(func() { + a_executor.Activate() + }) // Status - a_status.Activate() + _ = P.Submit(func() { + a_status.Activate() + }) } func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) { diff --git a/agent-go/a_status/Status.go b/agent-go/a_status/Status.go index ccb2afc..36f0786 100644 --- a/agent-go/a_status/Status.go +++ b/agent-go/a_status/Status.go @@ -1,10 +1,14 @@ package a_status import ( + "encoding/json" "fmt" + "strings" "time" "wdd.io/agent-common/logger" + "wdd.io/agent-common/utils" "wdd.io/agent-go/g" + "wdd.io/agent-go/rabbitmq" ) var log = logger.Log @@ -41,46 +45,53 @@ type AgentInfo struct { func Activate() { log.Info("Module [ STATUS ] activated !") - //v, ok := (octopusMessage.Content).(string) - //if !ok { - // log.ErrorF("convert to string is wrong %s", v) - //} - // - //statusMsgString := octopusMessage.Content.(string) - // - //var statusMessage *a_status.StatusMessage - //err := json.Unmarshal([]byte(statusMsgString), &statusMessage) - //if err != nil { - // fmt.Println(err.Error()) - // log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) - // return - //} - // - //// OMessageStatusTypeEnum - //var statusRes string - //if strings.HasPrefix(statusMessage.StatusType, "PING") { - // // ping info - // statusRes = a_status.Ping() - //} else if strings.HasPrefix(statusMessage.StatusType, "METRIC") { - // // metric info - // agentStatusString, _ := json.Marshal(a_status.ReportAgentMetric()) - // statusRes = string(agentStatusString) - //} else if strings.HasPrefix(statusMessage.StatusType, "INFO") { - // log.InfoF("[statusOMHandler] - call for agent info !") - //} else { - // log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType) - //} - // - //// 返回消息 - //// 组装消息 - //octopusMessage.ACTime = utils.ParseDateTimeTime() - //octopusMessage.Result = statusRes - //// 发送回去 - //statusOctopusReplayMessage, _ := json.Marshal(octopusMessage) - //OctopusToServerQueue.Send(statusOctopusReplayMessage) - // - //// 输出日志 - //log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) + + // 死循环获取channel中的数据 + for { + if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { + // 处理数据 + + statusMsgString := octopusMessage.Content.(string) + + var statusMessage *StatusMessage + err := json.Unmarshal([]byte(statusMsgString), &statusMessage) + if err != nil { + fmt.Println(err.Error()) + log.Error(fmt.Sprintf("status message convert to json is wrong! msg is => %s", octopusMessage)) + return + } + + // OMessageStatusTypeEnum + var statusRes string + if strings.HasPrefix(statusMessage.StatusType, "PING") { + // ping info + statusRes = Ping() + } else if strings.HasPrefix(statusMessage.StatusType, "METRIC") { + // metric info + agentStatusString, _ := json.Marshal(ReportAgentMetric()) + statusRes = string(agentStatusString) + } else if strings.HasPrefix(statusMessage.StatusType, "INFO") { + log.InfoF("[statusOMHandler] - call for agent info !") + } else { + log.WarnF("[statusOMHandler] - error octopus status message type of %s", statusMessage.StatusType) + } + + // 返回消息 + // 组装消息 + octopusMessage.ACTime = utils.ParseDateTimeTime() + octopusMessage.Result = statusRes + // 发送回去 + octopusMessage.SendToOctopusServer() + + // 输出日志 + log.InfoF("接收到查询Agent状态的请求,结果为 => %s", statusRes) + } else { + // channel已关闭,跳出循环 + log.ErrorF("business queue [ STATUS ] receive chan has closed !") + break + } + } + } func Ping() string { diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index 5075564..5ebd499 100644 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -9,6 +9,8 @@ import ( "wdd.io/agent-common/logger" ) +var BusinessRuntimeQueue = &RabbitQueue{} + type RabbitMQ interface { RabbitSendWriter RabbitConnectCloser diff --git a/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java b/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java index 2e87b28..d761fe3 100644 --- a/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java +++ b/server/src/test/java/io/wdd/server/func/TestImageSyncScheduler.java @@ -42,7 +42,7 @@ public class TestImageSyncScheduler { )); ArrayList ImageFullNameList = new ArrayList<>(List.of( - "harbor.cdcyy.com.cn/cmii/cmii-uav-industrial-portfolio:5.4.0-cqly-032802" + "harbor.cdcyy.com.cn/cmii/cmii-uav-industrial-portfolio:5.4.0-cqly-032901" )); Boolean downloadAndCompressOnly = true;