From 1af25d399232d8091a55cb61f298905e536ded09 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 24 Mar 2023 15:09:22 +0800 Subject: [PATCH] =?UTF-8?q?[agent-go]=20=E5=88=9D=E6=AD=A5=E5=AE=8C?= =?UTF-8?q?=E6=88=90Executor=E9=83=A8=E5=88=86=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent-go/executor/CommandExecutor.go | 121 ++++++++++++++++++ agent-go/g/global.go | 10 +- agent-go/rabbitmq/MessageReaderWriter.go | 46 +++++++ agent-go/rabbitmq/MessageSender.go | 20 --- agent-go/rabbitmq/OctopusMessage.go | 10 ++ agent-go/rabbitmq/OctopusMsgHandler.go | 96 ++++++++++++++ .../{RMQConnector.go => RabbitMQConnector.go} | 0 agent-go/register/AgentIntitilization.go | 53 +++++++- .../web/TestCommandExecutorController.java | 3 +- 9 files changed, 328 insertions(+), 31 deletions(-) create mode 100644 agent-go/executor/CommandExecutor.go create mode 100644 agent-go/rabbitmq/MessageReaderWriter.go delete mode 100644 agent-go/rabbitmq/MessageSender.go create mode 100644 agent-go/rabbitmq/OctopusMsgHandler.go rename agent-go/rabbitmq/{RMQConnector.go => RabbitMQConnector.go} (100%) diff --git a/agent-go/executor/CommandExecutor.go b/agent-go/executor/CommandExecutor.go new file mode 100644 index 0000000..f3957ca --- /dev/null +++ b/agent-go/executor/CommandExecutor.go @@ -0,0 +1,121 @@ +package executor + +import ( + "agent-go/g" + "agent-go/rabbitmq" + "bufio" + "bytes" + "fmt" + "os/exec" +) + +var log = g.G.LOG + +func Execute(om *rabbitmq.OctopusMessage, em *rabbitmq.ExecutionMessage) ([]string, error) { + + var resultLog []string + var err error + + if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 { + // 管道命令 + resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand) + } else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 { + // 多行命令 + resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand) + } else { + // 单行命令 + resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand) + } + + // 处理执行日志 + // 是否需要返回处理日志,现在默认返回 + if em.NeedResultReplay { + // 需要返回处理结果 + + } + + log.Info(fmt.Sprintf("Executor Result: %s", resultLog)) + + return resultLog, err +} + +func PipeLineCommandExecutor(pipeLineCommand [][]string) ([]string, error) { + + var cmds []*exec.Cmd + + // 创建每个命令对象,并将前一个命令的标准输出连接到当前命令的标准输入 + for i, partOfCommand := range pipeLineCommand { + cmd := exec.Command(partOfCommand[0], partOfCommand[1:]...) + if i > 0 { + prevCmd := cmds[i-1] + out, err := prevCmd.StdoutPipe() + if err != nil { + return nil, err + } + cmd.Stdin = out + } + cmds = append(cmds, cmd) + } + + // 执行最后一个命令,并获取其输出 + lastCmd := cmds[len(cmds)-1] + + var out bytes.Buffer + + lastCmd.Stdout = &out + lastCmd.Stderr = &out + err := lastCmd.Run() + + scanner := bufio.NewScanner(&out) + var result []string + for scanner.Scan() { + result = append(result, scanner.Text()) + } + + if err != nil { + return nil, err + } + + return result, nil +} + +func MultiLineCommandExecutor(multiLineCommandExecutor [][]string) ([]string, error) { + + var res []string + + for _, singleLineCommand := range multiLineCommandExecutor { + + singleLogs, err := SingleLineCommandExecutor(singleLineCommand) + res := append(res, singleLogs...) + + if err != nil { + return res, err + } + + } + + return res, nil +} + +// SingleLineCommandExecutor 执行单行命令 +func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) { + + cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...) + var out bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &out + + err := cmd.Run() + + scanner := bufio.NewScanner(&out) + var result []string + for scanner.Scan() { + result = append(result, scanner.Text()) + } + + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/agent-go/g/global.go b/agent-go/g/global.go index 278272b..cd0a01b 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -1,10 +1,14 @@ package g -import "github.com/spf13/viper" +import ( + "agent-go/register" + "github.com/spf13/viper" +) type Global struct { - LOG *Logger - NacosConfig *viper.Viper + LOG *Logger + NacosConfig *viper.Viper + AgentServerInfo *register.AgentServerInfo } const ( diff --git a/agent-go/rabbitmq/MessageReaderWriter.go b/agent-go/rabbitmq/MessageReaderWriter.go new file mode 100644 index 0000000..a6b6e5a --- /dev/null +++ b/agent-go/rabbitmq/MessageReaderWriter.go @@ -0,0 +1,46 @@ +package rabbitmq + +import ( + "agent-go/g" + "fmt" + "github.com/streadway/amqp" +) + +// Send 向RabbitMQ中发送消息 +func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { + // 往哪里发 + channel := conn.Channel + // 发送 + channel.Publish( + connProp.ExchangeName, + connProp.TopicKey, + false, + true, + amqp.Publishing{ + ContentType: "text/plain", + Body: message, + }, + ) +} + +func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery { + + // 拿到特定的Channel + channel := conn.Channel + + // 开始读取队列中的全部消息 + msgs, err := channel.Consume( + connProp.QueueName, // 队列名称 + g.G.AgentServerInfo.AgentTopicName, // 消费者名称 + autoAck, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // arguments + ) + if err != nil { + log.Error(fmt.Sprintf("Failed to register a consumer: %v", err)) + } + + return msgs +} diff --git a/agent-go/rabbitmq/MessageSender.go b/agent-go/rabbitmq/MessageSender.go deleted file mode 100644 index 59c9f40..0000000 --- a/agent-go/rabbitmq/MessageSender.go +++ /dev/null @@ -1,20 +0,0 @@ -package rabbitmq - -import "github.com/streadway/amqp" - -// Send 向RabbitMQ中发送消息 -func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) { - // 往哪里发 - channel := conn.Channel - // 发送 - channel.Publish( - connProp.ExchangeName, - connProp.TopicKey, - false, - true, - amqp.Publishing{ - ContentType: "text/plain", - Body: message, - }, - ) -} diff --git a/agent-go/rabbitmq/OctopusMessage.go b/agent-go/rabbitmq/OctopusMessage.go index ae90dd9..d24f329 100644 --- a/agent-go/rabbitmq/OctopusMessage.go +++ b/agent-go/rabbitmq/OctopusMessage.go @@ -14,6 +14,16 @@ type OctopusMessage struct { ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"` } +type ExecutionMessage struct { + NeedResultReplay bool `json:"needResultReplay"` + DurationTask bool `json:"durationTask,default:false"` + Type string `json:"type"` + SingleLineCommand []string `json:"singleLineCommand"` + MultiLineCommand [][]string `json:"multiLineCommand"` + PipeLineCommand [][]string `json:"pipeLineCommand"` + ResultKey string `json:"resultKey"` +} + // BuildOctopusMsg 生成OctopusMessage func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage { diff --git a/agent-go/rabbitmq/OctopusMsgHandler.go b/agent-go/rabbitmq/OctopusMsgHandler.go new file mode 100644 index 0000000..13e67f6 --- /dev/null +++ b/agent-go/rabbitmq/OctopusMsgHandler.go @@ -0,0 +1,96 @@ +package rabbitmq + +import ( + "agent-go/executor" + "agent-go/g" + "encoding/json" + "fmt" +) + +func HandleOMsg(initOMsgFromServer *OctopusMessage) { + + agentTopicName := initOMsgFromServer.UUID + OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange") + + octopusConnectProp := &ConnectProperty{ + ExchangeName: OctopusExchange, + QueueName: agentTopicName, + ExchangeType: g.QueueTopic, + TopicKey: agentTopicName + "*", + } + + octopusConn, err := NewRabbitMQConn(octopusConnectProp) + if err != nil { + log.Error(fmt.Sprintf("Octopus Message Queue create Error ! => %v", octopusConnectProp)) + panic(err) + } + + // 开始接收消息 + channel := octopusConn.Channel + deliveries, err := channel.Consume( + agentTopicName, + agentTopicName, + true, + false, + false, + false, + nil, + ) + if err != nil { + return + } + + // 死循环,处理Ocotpus Message + for delivery := range deliveries { + + var om *OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error("Octopus Message Parse Error !") + // 保存到某处 + continue + } + + // 策略模式 处理消息 + doHandleOctopusMessage(om) + } + +} + +func doHandleOctopusMessage(octopusMessage *OctopusMessage) { + + switch octopusMessage.Type { + case g.InitOmType: + go func() {}() + case g.ExecOmType: + go executorOMHandler(octopusMessage) + case g.StatusOmType: + go statusOMHandler(octopusMessage) + default: + go blackHoleOMHandler(octopusMessage) + } + +} + +func executorOMHandler(octopusMessage *OctopusMessage) { + + executionMsgString := octopusMessage.Content.(string) + + var executionMessage *ExecutionMessage + err := json.Unmarshal([]byte(executionMsgString), &executionMessage) + if err != nil { + return + } + + // 交给后端的实际处理器处理, 再次策略 + executor.Execute(octopusMessage, executionMessage) + +} + +func statusOMHandler(octopusMessage *OctopusMessage) { + +} + +func blackHoleOMHandler(octopusMessage *OctopusMessage) { + log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage)) +} diff --git a/agent-go/rabbitmq/RMQConnector.go b/agent-go/rabbitmq/RabbitMQConnector.go similarity index 100% rename from agent-go/rabbitmq/RMQConnector.go rename to agent-go/rabbitmq/RabbitMQConnector.go diff --git a/agent-go/register/AgentIntitilization.go b/agent-go/register/AgentIntitilization.go index 118c842..5419b44 100644 --- a/agent-go/register/AgentIntitilization.go +++ b/agent-go/register/AgentIntitilization.go @@ -11,11 +11,12 @@ import ( var log = g.G.LOG var omType = g.InitOmType +var agentServerInfo = g.G.AgentServerInfo func INIT() { // 获取系统的环境变量 - agentServerInfo := parseAgentServerInfo() + g.G.AgentServerInfo = parseAgentServerInfo() nacosConfig := g.G.NacosConfig @@ -72,19 +73,58 @@ func INIT() { // 监听初始化连接中的信息 // 建立运行时RabbitMQ连接 - handleInitMsgFromServer() + handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp) } -func handleInitMsgFromServer() { +// handleInitMsgFromServer 处理从Server接收的注册信息 +func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { + + deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false) + + // 同步很多抢占注册的情况 + for delivery := range deliveries { + + var om *rabbitmq.OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", + string(delivery.Body))) + } + + // 处理OM信息 + if om.UUID == g.G.AgentServerInfo.AgentTopicName { + // 是本机的注册回复信息 + + // 建立运行时RabbitMQ连接 + rabbitmq.HandleOMsg(om) + + // 手动确认信息 + delivery.Ack(false) + + // 手动关闭 注册队列的连接 + shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp) + + return + } + + // 不是自身的 注册回复信息 -- 拒绝 + log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v", om)) + delivery.Nack(false, true) + } } -func parseAgentServerInfo() AgentServerInfo { +// shutdownRegisterQueueConnection 关闭初始化连接的两个队列 +func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) { + +} + +func parseAgentServerInfo() *AgentServerInfo { // 约定文件地址为 /etc/environment.d/octopus-agent.conf // 目前使用 - var agentServerInfo AgentServerInfo + var agentServerInfo *AgentServerInfo yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml") if err != nil { panic(fmt.Errorf("failed to read YAML file: %v", err)) @@ -97,7 +137,8 @@ func parseAgentServerInfo() AgentServerInfo { jsonFormat, err := json.Marshal(agentServerInfo) if err != nil { - return AgentServerInfo{} + log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo)) + panic(err) } log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat))) diff --git a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java index a16bdd7..f3755b8 100644 --- a/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java +++ b/agent/src/main/java/io/wdd/agent/executor/web/TestCommandExecutorController.java @@ -4,7 +4,6 @@ package io.wdd.agent.executor.web; import io.wdd.agent.executor.CommandExecutor; import io.wdd.agent.executor.FunctionExecutor; import io.wdd.common.beans.executor.ExecutionMessage; -import io.wdd.common.beans.rabbitmq.OctopusMessage; import io.wdd.common.beans.response.R; import org.springframework.web.bind.annotation.*; @@ -24,7 +23,7 @@ public class TestCommandExecutorController { @PostMapping("comand") public R testFor( - @RequestBody OctopusMessage octopusMessage + @RequestBody ExecutionMessage executionMessage ) { return R.ok("1");