[agent-wdd] 完成Excecutor和Operator部分,完成base tools部分

This commit is contained in:
zeaslity
2025-02-14 17:17:55 +08:00
parent dabf63f10f
commit ce0395ae66
12 changed files with 380 additions and 23 deletions

204
agent-wdd/op/Excutor.go Normal file
View File

@@ -0,0 +1,204 @@
package op
import (
"agent-wdd/log"
"bufio"
"bytes"
"fmt"
"os/exec"
"strings"
)
// PipeLineCommandExecutor 执行管道命令,返回执行结果和合并后的输出内容
// pipeLineCommand: 管道命令组,如 [][]string{{"ps", "aux"}, {"grep", "nginx"}, {"wc", "-l"}}
// 返回值:
//
// bool - 所有命令是否全部执行成功
// []string - 合并后的输出内容中间命令的stderr + 最后一个命令的stdout/stderr
func PipeLineCommandExecutor(pipeLineCommand [][]string) (ok bool, resultLog []string) {
if len(pipeLineCommand) == 0 {
return false, nil
}
// 预检所有子命令
for _, cmd := range pipeLineCommand {
if len(cmd) == 0 {
return false, nil
}
}
// 创建命令组
cmds := make([]*exec.Cmd, len(pipeLineCommand))
for i, args := range pipeLineCommand {
cmds[i] = exec.Command(args[0], args[1:]...)
}
// 建立管道连接
for i := 0; i < len(cmds)-1; i++ {
stdoutPipe, err := cmds[i].StdoutPipe()
if err != nil {
return false, []string{err.Error()}
}
cmds[i+1].Stdin = stdoutPipe
}
// 准备输出捕获
stderrBuffers := make([]bytes.Buffer, len(cmds)) // 所有命令的stderr
var lastStdout bytes.Buffer // 最后一个命令的stdout
// 绑定输出
for i, cmd := range cmds {
cmd.Stderr = &stderrBuffers[i] // 每个命令单独捕获stderr
}
cmds[len(cmds)-1].Stdout = &lastStdout // 仅捕获最后一个命令的stdout
// 启动所有命令
started := make([]*exec.Cmd, 0, len(cmds))
defer func() {
// 异常时清理已启动进程
for _, cmd := range started {
if cmd.Process != nil {
cmd.Process.Kill()
}
}
}()
for _, cmd := range cmds {
if err := cmd.Start(); err != nil {
return false, []string{err.Error()}
}
started = append(started, cmd)
}
// 等待所有命令完成
success := true
for _, cmd := range cmds {
if err := cmd.Wait(); err != nil {
success = false
}
}
// 合并输出内容
output := make([]string, 0)
// 合并中间命令的stderr按命令顺序
for i := 0; i < len(cmds)-1; i++ {
scanner := bufio.NewScanner(&stderrBuffers[i])
for scanner.Scan() {
output = append(output, scanner.Text())
}
}
// 合并最后一个命令的输出stdout在前 + stderr在后
scanner := bufio.NewScanner(&lastStdout)
for scanner.Scan() {
output = append(output, scanner.Text())
}
scanner = bufio.NewScanner(&stderrBuffers[len(cmds)-1])
for scanner.Scan() {
output = append(output, scanner.Text())
}
return success, output
}
// SingleLineCommandExecutor 执行单行shell命令返回执行结果和输出内容
// singleLineCommand: 命令及参数,如 []string{"ls", "-l"}
// 返回值:
//
// bool - 命令是否执行成功true为成功false为失败
// []string - 合并后的标准输出和标准错误内容(按行分割)
func SingleLineCommandExecutor(singleLineCommand []string) (ok bool, resultLog []string) {
log.Info("[Excutor] - start => %v", singleLineCommand)
if len(singleLineCommand) == 0 {
return false, nil
}
// 创建命令实例
cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...)
// 创建输出缓冲区
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
// 执行命令并获取错误信息
err := cmd.Run()
// 合并输出结果
output := mergeOutput(&stdoutBuf, &stderrBuf)
// 判断执行结果
return err == nil, output
}
// mergeOutput 合并标准输出和标准错误,按行分割
//func mergeOutput(stdoutBuf, stderrBuf *bytes.Buffer) []string {
// var output []string
//
// // 处理标准输出
// scanner := bufio.NewScanner(stdoutBuf)
// for scanner.Scan() {
// output = append(output, scanner.Text())
// }
//
// // 处理标准错误
// scanner = bufio.NewScanner(stderrBuf)
// for scanner.Scan() {
// output = append(output, scanner.Text())
// }
//
// return output
//}
// 若追求极致性能,可优化为流式合并
func mergeOutput(stdout, stderr *bytes.Buffer) []string {
combined := make([]string, 0, bytes.Count(stdout.Bytes(), []byte{'\n'})+bytes.Count(stderr.Bytes(), []byte{'\n'}))
combined = append(combined, strings.Split(stdout.String(), "\n")...)
combined = append(combined, strings.Split(stderr.String(), "\n")...)
return combined
}
func main() {
// 成功案例
success, output := SingleLineCommandExecutor([]string{"ls", "-l"})
fmt.Println("执行成功:", success)
fmt.Println("输出内容:", output)
// 失败案例(命令存在但参数错误)
success, output = SingleLineCommandExecutor([]string{"ls", "/nonexistent"})
fmt.Println("执行成功:", success)
fmt.Println("输出内容:", output)
// 失败案例(命令不存在)
success, output = SingleLineCommandExecutor([]string{"invalid_command"})
fmt.Println("执行成功:", success)
fmt.Println("输出内容:", output)
// 成功案例(三阶管道)
success, output = PipeLineCommandExecutor([][]string{
{"ps", "aux"},
{"grep", "nginx"},
{"wc", "-l"},
})
fmt.Println("执行结果:", success)
fmt.Println("合并输出:", output)
// 失败案例(命令不存在)
success, output = PipeLineCommandExecutor([][]string{
{"invalid_cmd"},
{"wc", "-l"},
})
fmt.Println("执行结果:", success)
fmt.Println("合并输出:", output)
// 失败案例(参数错误)
success, output = PipeLineCommandExecutor([][]string{
{"ls", "/nonexistent"},
{"grep", "test"},
})
fmt.Println("执行结果:", success)
fmt.Println("合并输出:", output)
}