230 lines
6.2 KiB
Go
230 lines
6.2 KiB
Go
package a_executor
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os/exec"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
"wdd.io/agent-common/logger"
|
||
"wdd.io/agent-common/utils"
|
||
"wdd.io/agent-go/rabbitmq"
|
||
)
|
||
|
||
type ExecutionMessage struct {
|
||
NeedResultReplay bool `json:"needResultReplay"`
|
||
DurationTask bool `json:"durationTask,default:false"`
|
||
ExecutionType string `json:"executionType"`
|
||
FuncContent []string `json:"funcContent"`
|
||
SingleLineCommand []string `json:"singleLineCommand"`
|
||
MultiLineCommand [][]string `json:"multiLineCommand"`
|
||
PipeLineCommand [][]string `json:"pipeLineCommand"`
|
||
ResultKey string `json:"resultKey"`
|
||
}
|
||
|
||
var log = logger.Log
|
||
|
||
// AgentOsOperatorCache global agent operator cache
|
||
var AgentOsOperatorCache = &AgentOsOperator{}
|
||
|
||
func Activate() {
|
||
log.Info("Module [ EXECUTOR ] activated !")
|
||
|
||
// 死循环获取channel中的数据
|
||
for {
|
||
if octopusMessage, ok := <-rabbitmq.BusinessRuntimeQueue.ReceiveChan.ExecutorRChan; ok {
|
||
// 处理数据
|
||
// 转换类型
|
||
executionMsgString := octopusMessage.Content.(string)
|
||
|
||
//解析 ExecutionMessage
|
||
var executionMessage *ExecutionMessage
|
||
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
|
||
if err != nil {
|
||
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
|
||
return
|
||
}
|
||
|
||
// 执行命令
|
||
ok, resultLog := Execute(executionMessage)
|
||
if ok {
|
||
octopusMessage.ResultCode = "200"
|
||
} else {
|
||
octopusMessage.ResultCode = "300"
|
||
}
|
||
|
||
// 返回结果
|
||
if executionMessage.NeedResultReplay {
|
||
// send back the result log
|
||
octopusMessage.Result = resultLog
|
||
}
|
||
// 返回时间
|
||
octopusMessage.ACTime = utils.ParseDateTimeTime()
|
||
|
||
// 返回结果
|
||
octopusMessage.SendToOctopusServer()
|
||
} else {
|
||
// channel已关闭,跳出循环
|
||
log.ErrorF("business queue [ EXECUTOR ] receive chan has closed !")
|
||
break
|
||
}
|
||
}
|
||
time.Sleep(time.Second * 5)
|
||
}
|
||
|
||
func Execute(em *ExecutionMessage) (bool, []string) {
|
||
|
||
var resultLog []string
|
||
var err error
|
||
ok := true
|
||
executionContent := em.ExecutionType + " == " + strings.Join(em.FuncContent, " - ")
|
||
|
||
log.DebugF("em message is => %#v", em)
|
||
|
||
if strings.HasPrefix(em.ExecutionType, "BASE") {
|
||
// base function
|
||
if len(em.FuncContent) > 1 {
|
||
ok, resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...)
|
||
} else {
|
||
ok, resultLog = AgentOsOperatorCache.Exec(em.FuncContent[0])
|
||
}
|
||
|
||
} else if strings.HasPrefix(em.ExecutionType, "APP") {
|
||
// app function
|
||
if len(em.FuncContent) > 1 {
|
||
ok, resultLog = AgentOsOperatorCache.Deploy(em.FuncContent[0], em.FuncContent[1:]...)
|
||
} else {
|
||
ok, resultLog = AgentOsOperatorCache.Deploy(em.FuncContent[0])
|
||
}
|
||
|
||
} else if strings.HasPrefix(em.ExecutionType, "HARBOR") {
|
||
// harbor function
|
||
if em.FuncContent == nil || len(em.FuncContent) <= 1 {
|
||
ok = false
|
||
resultLog = []string{
|
||
"[Harbor Execute] - functions args is wrong!",
|
||
}
|
||
}
|
||
// Harbor Execute
|
||
ok, resultLog = HarborOperatorCache.Exec(em.FuncContent[0], em.FuncContent[1:]...)
|
||
|
||
} else if strings.HasPrefix(em.ExecutionType, "IMAGE") {
|
||
// image function
|
||
if em.FuncContent == nil || len(em.FuncContent) <= 1 {
|
||
ok = false
|
||
resultLog = []string{
|
||
"[Harbor Execute] - functions args is wrong!",
|
||
}
|
||
}
|
||
// Harbor Execute
|
||
ok, resultLog = AgentOsOperatorCache.Sync(em.FuncContent[0], em.FuncContent[1:]...)
|
||
} else {
|
||
// deprecated
|
||
// shell command
|
||
if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 {
|
||
// 管道命令
|
||
resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand)
|
||
executionContent = fmt.Sprintf("%v", em.PipeLineCommand)
|
||
} else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 {
|
||
// 多行命令
|
||
resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand)
|
||
executionContent = fmt.Sprintf("%v", em.MultiLineCommand)
|
||
} else {
|
||
// 单行命令
|
||
resultLog, err = FormatAllCommandExecutor(em.SingleLineCommand)
|
||
executionContent = fmt.Sprintf("%v", em.SingleLineCommand)
|
||
}
|
||
|
||
}
|
||
|
||
// 归一化错误和日志
|
||
if err != nil {
|
||
resultLog = append(resultLog, " 命令执行错误如下 ", err.Error())
|
||
}
|
||
resultLog = append(resultLog, fmt.Sprintf("命令 %v 执行结果为 %s", executionContent, strconv.FormatBool(ok)))
|
||
// debug
|
||
log.DebugF("%v", resultLog)
|
||
|
||
return ok, resultLog
|
||
}
|
||
|
||
func PipeLineCommandExecutor(pipeLineCommand [][]string) ([]string, error) {
|
||
|
||
var resultSlice []string
|
||
var output []byte
|
||
var err error
|
||
|
||
tmp := make([]string, len(pipeLineCommand))
|
||
for index, pipe := range pipeLineCommand {
|
||
tmp[index] = strings.Join(pipe, " ")
|
||
}
|
||
pipelineCommandString := strings.Join(tmp, " | ")
|
||
|
||
resultSlice = append(resultSlice, fmt.Sprintf(" ========= 命令为 ====> %s", pipelineCommandString))
|
||
|
||
for _, pipeCommand := range pipeLineCommand {
|
||
if len(pipeCommand) == 0 {
|
||
continue
|
||
}
|
||
|
||
command := exec.Command(pipeCommand[0], pipeCommand[1:]...)
|
||
if len(output) > 0 {
|
||
command.Stdin = bytes.NewBuffer(output)
|
||
}
|
||
|
||
output, err = command.CombinedOutput()
|
||
if err != nil {
|
||
log.ErrorF("Pipeline Command Command Error => %v", err.Error())
|
||
|
||
// 收集错误的信息
|
||
resultSlice = append(resultSlice, "↓↓↓ 命令 错误 如下 ↓↓↓", string(output))
|
||
return resultSlice, err
|
||
}
|
||
}
|
||
|
||
// 正常的输出
|
||
resultSlice = append(resultSlice, "↓↓↓ 命令 输出 如下 ↓↓↓", string(output))
|
||
return resultSlice, err
|
||
}
|
||
|
||
func MultiLineCommandExecutor(multiLineCommandExecutor [][]string) ([]string, error) {
|
||
|
||
var res []string
|
||
for _, singleLineCommand := range multiLineCommandExecutor {
|
||
singleLogs, err := FormatAllCommandExecutor(singleLineCommand)
|
||
res = append(res, singleLogs...)
|
||
if err != nil {
|
||
log.Error(fmt.Sprintf("Execution error ! command is %v, error is %v", singleLineCommand, err))
|
||
return res, err
|
||
}
|
||
}
|
||
|
||
return res, nil
|
||
}
|
||
|
||
// SingleLineCommandExecutor 执行单行命令
|
||
func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) {
|
||
|
||
cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...)
|
||
var out bytes.Buffer
|
||
cmd.Stdout = &out
|
||
cmd.Stderr = &out
|
||
|
||
err := cmd.Run()
|
||
|
||
scanner := bufio.NewScanner(&out)
|
||
var result []string
|
||
for scanner.Scan() {
|
||
result = append(result, scanner.Text())
|
||
}
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return result, nil
|
||
}
|