package a_executor import ( "bufio" "bytes" "encoding/json" "fmt" "os/exec" "strconv" "strings" "time" "wdd.io/agent-common/logger" "wdd.io/agent-common/utils" "wdd.io/agent-go/rabbitmq" ) type ExecutionMessage struct { NeedResultReplay bool `json:"needResultReplay"` DurationTask bool `json:"durationTask,default:false"` ExecutionType string `json:"executionType"` FuncContent []string `json:"funcContent"` SingleLineCommand []string `json:"singleLineCommand"` MultiLineCommand [][]string `json:"multiLineCommand"` PipeLineCommand [][]string `json:"pipeLineCommand"` ResultKey string `json:"resultKey"` } var log = logger.Log var AgentOsOperatorCache = &AgentOsOperator{} func Activate() { log.Info("Module [ EXECUTOR ] activated !") // 死循环获取channel中的数据 for { if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok { // 处理数据 // 转换类型 executionMsgString := octopusMessage.Content.(string) //解析 ExecutionMessage var executionMessage *ExecutionMessage err := json.Unmarshal([]byte(executionMsgString), &executionMessage) if err != nil { log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString)) return } // 执行命令 ok, resultLog := Execute(executionMessage) if ok { octopusMessage.ResultCode = "200" } else { octopusMessage.ResultCode = "300" } // 返回结果 if executionMessage.NeedResultReplay { // send back the result log octopusMessage.Result = resultLog } // 返回时间 octopusMessage.ACTime = utils.ParseDateTimeTime() // 返回结果 octopusMessage.SendToOctopusServer() } else { // channel已关闭,跳出循环 log.ErrorF("business queue [ EXECUTOR ] receive chan has closed !") break } } time.Sleep(time.Second * 5) } func Execute(em *ExecutionMessage) (bool, []string) { var resultLog []string var err error ok := true executionContent := em.ExecutionType + " == " + strings.Join(em.FuncContent, " - ") log.DebugF("em message is => %#v", em) if strings.HasPrefix(em.ExecutionType, "BASE") { // base function if len(em.FuncContent) > 1 { ok, resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...) } else { ok, resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0]) } } else if strings.HasPrefix(em.ExecutionType, "APP") { // app function if len(em.FuncContent) > 1 { ok, resultLog = AgentOsOperatorCache.Deploy(em.FuncContent[0], em.FuncContent[1:]...) } else { ok, resultLog = AgentOsOperatorCache.Deploy(em.FuncContent[0]) } } else if strings.HasPrefix(em.ExecutionType, "HARBOR") { // harbor function if em.FuncContent == nil || len(em.FuncContent) <= 1 { ok = false resultLog = []string{ "[Harbor Execute] - functions args is wrong!", } } // Harbor Execute ok, resultLog = HarborOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...) } else if strings.HasPrefix(em.ExecutionType, "IMAGE") { // image function if em.FuncContent == nil || len(em.FuncContent) <= 1 { ok = false resultLog = []string{ "[Harbor Execute] - functions args is wrong!", } } // Harbor Execute ok, resultLog = AgentOsOperatorCache.Sync(em.FuncContent[0], em.FuncContent[1:]...) } else { // deprecated // shell command if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 { // 管道命令 resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand) executionContent = fmt.Sprintf("%v", em.PipeLineCommand) } else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 { // 多行命令 resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand) executionContent = fmt.Sprintf("%v", em.MultiLineCommand) } else { // 单行命令 resultLog, err = FormatAllCommandExecutor(em.SingleLineCommand) executionContent = fmt.Sprintf("%v", em.SingleLineCommand) } } // 归一化错误和日志 if err != nil { resultLog = append(resultLog, " 命令执行错误如下 ", err.Error()) } resultLog = append(resultLog, fmt.Sprintf("命令 %v 执行结果为 %s", executionContent, strconv.FormatBool(ok))) // debug log.DebugF("%v", resultLog) return ok, resultLog } func PipeLineCommandExecutor(pipeLineCommand [][]string) ([]string, error) { var resultSlice []string var output []byte var err error tmp := make([]string, len(pipeLineCommand)) for index, pipe := range pipeLineCommand { tmp[index] = strings.Join(pipe, " ") } pipelineCommandString := strings.Join(tmp, " | ") resultSlice = append(resultSlice, fmt.Sprintf(" ========= 命令为 ====> %s", pipelineCommandString)) for _, pipeCommand := range pipeLineCommand { if len(pipeCommand) == 0 { continue } command := exec.Command(pipeCommand[0], pipeCommand[1:]...) if len(output) > 0 { command.Stdin = bytes.NewBuffer(output) } output, err = command.CombinedOutput() if err != nil { log.ErrorF("Pipeline Command Command Error => %v", err.Error()) // 收集错误的信息 resultSlice = append(resultSlice, "↓↓↓ 命令 错误 如下 ↓↓↓", string(output)) return resultSlice, err } } // 正常的输出 resultSlice = append(resultSlice, "↓↓↓ 命令 输出 如下 ↓↓↓", string(output)) return resultSlice, err } func MultiLineCommandExecutor(multiLineCommandExecutor [][]string) ([]string, error) { var res []string for _, singleLineCommand := range multiLineCommandExecutor { singleLogs, err := FormatAllCommandExecutor(singleLineCommand) res = append(res, singleLogs...) if err != nil { log.Error(fmt.Sprintf("Execution error ! command is %v, error is %v", singleLineCommand, err)) return res, err } } return res, nil } // SingleLineCommandExecutor 执行单行命令 func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) { cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...) var out bytes.Buffer cmd.Stdout = &out cmd.Stderr = &out err := cmd.Run() scanner := bufio.NewScanner(&out) var result []string for scanner.Scan() { result = append(result, scanner.Text()) } if err != nil { return nil, err } return result, nil }