From ce0395ae669e9ac9dcf6e6201890bcea04fa3007 Mon Sep 17 00:00:00 2001 From: zeaslity Date: Fri, 14 Feb 2025 17:17:55 +0800 Subject: [PATCH] =?UTF-8?q?[agent-wdd]=20=E5=AE=8C=E6=88=90Excecutor?= =?UTF-8?q?=E5=92=8COperator=E9=83=A8=E5=88=86=EF=BC=8C=E5=AE=8C=E6=88=90b?= =?UTF-8?q?ase=20tools=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../d_app/TemplateIngressConfigMap.go | 4 +- .../a_executor/beans/DockerDaemonConfig.go | 6 +- agent-go/a_init/AgentInitialization.go | 2 +- agent-go/rabbitmq/RabbitMsgQueue.go | 2 +- agent-operator/image/ImageOperator.go | 2 +- agent-wdd/cmd/Base.go | 47 +++- agent-wdd/config/Config.go | 16 +- agent-wdd/config/OS.go | 7 + agent-wdd/one-build-and-run.ps1 | 2 +- agent-wdd/op/Excutor.go | 204 ++++++++++++++++++ agent-wdd/op/Operator.go | 110 ++++++++++ agent-wdd/utils/ObjectUtils.go | 1 + 12 files changed, 380 insertions(+), 23 deletions(-) create mode 100644 agent-wdd/op/Excutor.go create mode 100644 agent-wdd/op/Operator.go create mode 100644 agent-wdd/utils/ObjectUtils.go diff --git a/agent-deploy/d_app/TemplateIngressConfigMap.go b/agent-deploy/d_app/TemplateIngressConfigMap.go index f4a73a7..c0ffe17 100755 --- a/agent-deploy/d_app/TemplateIngressConfigMap.go +++ b/agent-deploy/d_app/TemplateIngressConfigMap.go @@ -164,8 +164,8 @@ metadata: nginx.ingress.kubernetes.io/enable-cors: "true" nginx.ingress.kubernetes.io/rewrite-target: /$1 nginx.ingress.kubernetes.io/configuration-snippet: | - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "Upgrade"; + proxy_set_header upgradePrefix $http_upgrade; + proxy_set_header Connection "upgradePrefix"; spec: rules: - host: fake-domain.{{ .Namespace }}.io diff --git a/agent-go/a_executor/beans/DockerDaemonConfig.go b/agent-go/a_executor/beans/DockerDaemonConfig.go index ae778d9..a1ac546 100755 --- a/agent-go/a_executor/beans/DockerDaemonConfig.go +++ b/agent-go/a_executor/beans/DockerDaemonConfig.go @@ -43,7 +43,7 @@ LimitNOFILE=infinity TasksMax=infinity OOMScoreAdjust=-999 -[Install] +[installPrefix] WantedBy=multi-user.target ` @@ -57,7 +57,7 @@ SocketMode=0660 SocketUser=root SocketGroup=docker -[Install] +[installPrefix] WantedBy=sockets.target ` @@ -107,7 +107,7 @@ Delegate=yes KillMode=process OOMScoreAdjust=-500 -[Install] +[installPrefix] WantedBy=multi-user.target ` diff --git a/agent-go/a_init/AgentInitialization.go b/agent-go/a_init/AgentInitialization.go index 861c627..5599842 100755 --- a/agent-go/a_init/AgentInitialization.go +++ b/agent-go/a_init/AgentInitialization.go @@ -553,7 +553,7 @@ func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initFromServerQueue.ConsumeOK.Store(false) initToServerQueue.ConsumeOK.Store(false) - log.InfoF("Octopus Agent Init Queue has disconnected!") + log.InfoF("Octopus Agent initCommand Queue has disconnected!") } func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo { diff --git a/agent-go/rabbitmq/RabbitMsgQueue.go b/agent-go/rabbitmq/RabbitMsgQueue.go index ae60fd3..d614f7c 100755 --- a/agent-go/rabbitmq/RabbitMsgQueue.go +++ b/agent-go/rabbitmq/RabbitMsgQueue.go @@ -161,7 +161,7 @@ func (r *RabbitQueue) Connect() { // build for receive chan rabbitRCha := &RabbitReceiveChan{} - if strings.HasPrefix(r.RabbitProp.QueueName, "Init") { + if strings.HasPrefix(r.RabbitProp.QueueName, "initCommand") { // init queue rabbitRCha.InitRChan = make(chan *OctopusMessage) } else { diff --git a/agent-operator/image/ImageOperator.go b/agent-operator/image/ImageOperator.go index 59efdeb..099a3da 100644 --- a/agent-operator/image/ImageOperator.go +++ b/agent-operator/image/ImageOperator.go @@ -521,7 +521,7 @@ func SaveImageListToGzipFile(imageFullNames []string, folderPathPrefix string, o //log.InfoF("[SaveImagesToGzipFile] - start saving images to [%s]", gzipFileFullPath) // //// 删除旧的Gzip文件 - //if err := os.Remove(gzipFileFullPath); err != nil && !os.IsNotExist(err) { + //if err := os.removePrefix(gzipFileFullPath); err != nil && !os.IsNotExist(err) { // log.ErrorF("[SaveImagesToGzipFile] - failed to remove old gzip file: %s", err) // return false, "", errorGzipImageList //} diff --git a/agent-wdd/cmd/Base.go b/agent-wdd/cmd/Base.go index 5cfbee7..0378c6f 100644 --- a/agent-wdd/cmd/Base.go +++ b/agent-wdd/cmd/Base.go @@ -1,10 +1,22 @@ package cmd import ( + "agent-wdd/config" + "agent-wdd/log" + "agent-wdd/op" "fmt" "github.com/spf13/cobra" ) +var ( + ubuntuCommonTools = []string{ + "iputils-ping", "net-tools", "dnsutils", "lsof", "curl", "wget", "mtr-tiny", "vim", "htop", "lrzsz", + } + centosCommonTools = []string{ + "deltarpm", "net-tools", "iputils", "bind-utils", "lsof", "curl", "wget", "vim", "mtr", "htop", + } +) + // 添加base子命令 func addBaseSubcommands(cmd *cobra.Command) { // 1.1 docker @@ -23,9 +35,37 @@ func addBaseSubcommands(cmd *cobra.Command) { // 其他base子命令... + // 通用工具安装 + commonToolsInstall := &cobra.Command{ + Use: "tools", + Short: "通用工具安装 利用本机的yum,apt等从网络安装常用的软件", + Run: func(cmd *cobra.Command, args []string) { + log.Info("Common tool installation!") + + // Whether It can connect to internet + if config.CanConnectInternet() <= 1 { + log.Error("服务器无法连接互联网,无法执行tools") + return + } + + // package install + // only support ubuntu(debian) centos(debian openEuler) + packOperator := op.AgentPackOperator + packOperator.PackageInit() + + os := config.ConfigCache.Agent.OS + if os.IsUbuntuType { + packOperator.Install(ubuntuCommonTools) + } else { + packOperator.Install(centosCommonTools) + } + }, + } + cmd.AddCommand( dockerCmd, dockerComposeCmd, + commonToolsInstall, // 其他命令... ) } @@ -64,10 +104,3 @@ func addDockerSubcommands(cmd *cobra.Command) { func addDockerComposeSubcommands(cmd *cobra.Command) { } - -// addToolsSubcommands 利用本机的yum,apt等从网络安装常用的软件 -func addToolsSubcommands(cmd *cobra.Command) { - // 检测本机使用的包安装方式为apt还是yum - // 检查本机 - // -} diff --git a/agent-wdd/config/Config.go b/agent-wdd/config/Config.go index da83942..9982f43 100644 --- a/agent-wdd/config/Config.go +++ b/agent-wdd/config/Config.go @@ -33,13 +33,15 @@ type Agent struct { } type OS struct { - Hostname string `yaml:"hostname"` - OsName string `yaml:"os_name"` - OsFamily string `yaml:"os_family"` - OsVersion string `yaml:"os_version"` - OsType string `yaml:"os_type"` - Kernel string `yaml:"kernel"` - Arch string `yaml:"arch"` + Hostname string `yaml:"hostname"` + OsName string `yaml:"os_name"` + OsFamily string `yaml:"os_family"` + OsVersion string `yaml:"os_version"` + OsType string `yaml:"os_type"` + Kernel string `yaml:"kernel"` + Arch string `yaml:"arch"` + IsUbuntuType bool `yaml:"is_ubuntu_type",comment:"是否是ubuntu类型的操作系统"` + PackInit bool `yaml:"pack_init",comment:"是否初始化,ubuntu需要"` } type Network struct { diff --git a/agent-wdd/config/OS.go b/agent-wdd/config/OS.go index 275a4ea..4ae87b9 100644 --- a/agent-wdd/config/OS.go +++ b/agent-wdd/config/OS.go @@ -77,6 +77,13 @@ func (o *OS) Gather() { o.Kernel = strings.TrimSpace(string(out)) } + // 检查包管理的方式 + c := exec.Command("command", "-v", "apt") + _, err = c.Output() + if err == nil { + o.IsUbuntuType = true + } + // 获取系统架构 o.Arch = runtime.GOARCH } diff --git a/agent-wdd/one-build-and-run.ps1 b/agent-wdd/one-build-and-run.ps1 index d11e0be..1e52cb1 100644 --- a/agent-wdd/one-build-and-run.ps1 +++ b/agent-wdd/one-build-and-run.ps1 @@ -16,7 +16,7 @@ try { Write-Host "4. Exec the command ..." -ForegroundColor Blue Write-Host "" Write-Host "" - ssh root@192.168.35.71 "chmod +x agent-wdd_linux_amd64 && ./agent-wdd_linux_amd64 info os" + ssh root@192.168.35.71 "chmod +x agent-wdd_linux_amd64 && ./agent-wdd_linux_amd64 base tools" Write-Host "" Write-Host "" Write-Host "5. Cheak Info Result ..." -ForegroundColor Blue diff --git a/agent-wdd/op/Excutor.go b/agent-wdd/op/Excutor.go new file mode 100644 index 0000000..0826df8 --- /dev/null +++ b/agent-wdd/op/Excutor.go @@ -0,0 +1,204 @@ +package op + +import ( + "agent-wdd/log" + "bufio" + "bytes" + "fmt" + "os/exec" + "strings" +) + +// PipeLineCommandExecutor 执行管道命令,返回执行结果和合并后的输出内容 +// pipeLineCommand: 管道命令组,如 [][]string{{"ps", "aux"}, {"grep", "nginx"}, {"wc", "-l"}} +// 返回值: +// +// bool - 所有命令是否全部执行成功 +// []string - 合并后的输出内容(中间命令的stderr + 最后一个命令的stdout/stderr) +func PipeLineCommandExecutor(pipeLineCommand [][]string) (ok bool, resultLog []string) { + if len(pipeLineCommand) == 0 { + return false, nil + } + + // 预检所有子命令 + for _, cmd := range pipeLineCommand { + if len(cmd) == 0 { + return false, nil + } + } + + // 创建命令组 + cmds := make([]*exec.Cmd, len(pipeLineCommand)) + for i, args := range pipeLineCommand { + cmds[i] = exec.Command(args[0], args[1:]...) + } + + // 建立管道连接 + for i := 0; i < len(cmds)-1; i++ { + stdoutPipe, err := cmds[i].StdoutPipe() + if err != nil { + return false, []string{err.Error()} + } + cmds[i+1].Stdin = stdoutPipe + } + + // 准备输出捕获 + stderrBuffers := make([]bytes.Buffer, len(cmds)) // 所有命令的stderr + var lastStdout bytes.Buffer // 最后一个命令的stdout + + // 绑定输出 + for i, cmd := range cmds { + cmd.Stderr = &stderrBuffers[i] // 每个命令单独捕获stderr + } + cmds[len(cmds)-1].Stdout = &lastStdout // 仅捕获最后一个命令的stdout + + // 启动所有命令 + started := make([]*exec.Cmd, 0, len(cmds)) + defer func() { + // 异常时清理已启动进程 + for _, cmd := range started { + if cmd.Process != nil { + cmd.Process.Kill() + } + } + }() + + for _, cmd := range cmds { + if err := cmd.Start(); err != nil { + return false, []string{err.Error()} + } + started = append(started, cmd) + } + + // 等待所有命令完成 + success := true + for _, cmd := range cmds { + if err := cmd.Wait(); err != nil { + success = false + } + } + + // 合并输出内容 + output := make([]string, 0) + + // 合并中间命令的stderr(按命令顺序) + for i := 0; i < len(cmds)-1; i++ { + scanner := bufio.NewScanner(&stderrBuffers[i]) + for scanner.Scan() { + output = append(output, scanner.Text()) + } + } + + // 合并最后一个命令的输出(stdout在前 + stderr在后) + scanner := bufio.NewScanner(&lastStdout) + for scanner.Scan() { + output = append(output, scanner.Text()) + } + scanner = bufio.NewScanner(&stderrBuffers[len(cmds)-1]) + for scanner.Scan() { + output = append(output, scanner.Text()) + } + + return success, output +} + +// SingleLineCommandExecutor 执行单行shell命令,返回执行结果和输出内容 +// singleLineCommand: 命令及参数,如 []string{"ls", "-l"} +// 返回值: +// +// bool - 命令是否执行成功(true为成功,false为失败) +// []string - 合并后的标准输出和标准错误内容(按行分割) +func SingleLineCommandExecutor(singleLineCommand []string) (ok bool, resultLog []string) { + + log.Info("[Excutor] - start => %v", singleLineCommand) + + if len(singleLineCommand) == 0 { + return false, nil + } + + // 创建命令实例 + cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...) + + // 创建输出缓冲区 + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + // 执行命令并获取错误信息 + err := cmd.Run() + + // 合并输出结果 + output := mergeOutput(&stdoutBuf, &stderrBuf) + + // 判断执行结果 + return err == nil, output +} + +// mergeOutput 合并标准输出和标准错误,按行分割 +//func mergeOutput(stdoutBuf, stderrBuf *bytes.Buffer) []string { +// var output []string +// +// // 处理标准输出 +// scanner := bufio.NewScanner(stdoutBuf) +// for scanner.Scan() { +// output = append(output, scanner.Text()) +// } +// +// // 处理标准错误 +// scanner = bufio.NewScanner(stderrBuf) +// for scanner.Scan() { +// output = append(output, scanner.Text()) +// } +// +// return output +//} + +// 若追求极致性能,可优化为流式合并 +func mergeOutput(stdout, stderr *bytes.Buffer) []string { + combined := make([]string, 0, bytes.Count(stdout.Bytes(), []byte{'\n'})+bytes.Count(stderr.Bytes(), []byte{'\n'})) + combined = append(combined, strings.Split(stdout.String(), "\n")...) + combined = append(combined, strings.Split(stderr.String(), "\n")...) + return combined +} + +func main() { + // 成功案例 + success, output := SingleLineCommandExecutor([]string{"ls", "-l"}) + fmt.Println("执行成功:", success) + fmt.Println("输出内容:", output) + + // 失败案例(命令存在但参数错误) + success, output = SingleLineCommandExecutor([]string{"ls", "/nonexistent"}) + fmt.Println("执行成功:", success) + fmt.Println("输出内容:", output) + + // 失败案例(命令不存在) + success, output = SingleLineCommandExecutor([]string{"invalid_command"}) + fmt.Println("执行成功:", success) + fmt.Println("输出内容:", output) + + // 成功案例(三阶管道) + success, output = PipeLineCommandExecutor([][]string{ + {"ps", "aux"}, + {"grep", "nginx"}, + {"wc", "-l"}, + }) + fmt.Println("执行结果:", success) + fmt.Println("合并输出:", output) + + // 失败案例(命令不存在) + success, output = PipeLineCommandExecutor([][]string{ + {"invalid_cmd"}, + {"wc", "-l"}, + }) + fmt.Println("执行结果:", success) + fmt.Println("合并输出:", output) + + // 失败案例(参数错误) + success, output = PipeLineCommandExecutor([][]string{ + {"ls", "/nonexistent"}, + {"grep", "test"}, + }) + fmt.Println("执行结果:", success) + fmt.Println("合并输出:", output) +} diff --git a/agent-wdd/op/Operator.go b/agent-wdd/op/Operator.go new file mode 100644 index 0000000..937f27b --- /dev/null +++ b/agent-wdd/op/Operator.go @@ -0,0 +1,110 @@ +package op + +import ( + "agent-wdd/config" + "agent-wdd/log" + "agent-wdd/utils" + "strings" +) + +type PackageOperator struct { + installPrefix []string `json:"install" yaml:"install"` // 安装前缀 + removePrefix []string `json:"remove" yaml:"remove"` // 移除前缀 + upgradePrefix []string `json:"upgrade" yaml:"upgrade"` // 升级前缀 + initCommand []string +} + +var ( + AgentPackOperator = &PackageOperator{} + aptPackageOperator = &PackageOperator{ + installPrefix: []string{ + "apt-get", "install", "--allow-downgrades", "-y", + }, + removePrefix: []string{ + "apt", "remove", "-y", + }, + upgradePrefix: []string{}, + initCommand: []string{ + "apt-get", "update", + }, + } + yumPackageOperator = &PackageOperator{ + installPrefix: []string{ + "yum", "install", "-y", + }, + removePrefix: []string{ + "yum", "remove", "-y", + }, + upgradePrefix: []string{}, + } +) + +func (op *PackageOperator) Install(tools []string) { + // 判定本机的包管理Operator + generatePackageOperator() + + // install seperately + for _, tool := range tools { + ok, result := SingleLineCommandExecutor(append(AgentPackOperator.installPrefix, tool)) + if !ok { + log.Error("[install] failed! => %s", tool) + utils.BeautifulPrint(result) + } + } +} + +func (op *PackageOperator) PackageInit() { + log.Info("PackageInit !") + + // 判定本机的包管理Operator + generatePackageOperator() + + // package init + os := config.ConfigCache.Agent.OS + osFamily := strings.ToLower(os.OsFamily) + + if strings.Contains(osFamily, "ubuntu") || strings.Contains(osFamily, "debian") { + ok, resultLog := SingleLineCommandExecutor(aptPackageOperator.initCommand) + if !ok { + log.Error("APT init failed! please check !") + utils.BeautifulPrint(resultLog) + } + } + +} + +func (op *PackageOperator) Remove(tools []string) { + // 判定本机的包管理Operator + generatePackageOperator() + + // install seperately + for _, tool := range tools { + ok, result := SingleLineCommandExecutor(append(AgentPackOperator.removePrefix, tool)) + if !ok { + log.Error("[remove] failed! => %s", tool) + utils.BeautifulPrint(result) + } + } + +} + +func generatePackageOperator() { + + // cache return + if AgentPackOperator.initCommand != nil { + return + } + // 检查本机是否存在Os的信息 + os := config.ConfigCache.Agent.OS + if os.Hostname == "" { + os.Gather() + + os.SaveConfig() + } + + if os.IsUbuntuType { + AgentPackOperator = aptPackageOperator + } else { + AgentPackOperator = yumPackageOperator + } +} diff --git a/agent-wdd/utils/ObjectUtils.go b/agent-wdd/utils/ObjectUtils.go new file mode 100644 index 0000000..d4b585b --- /dev/null +++ b/agent-wdd/utils/ObjectUtils.go @@ -0,0 +1 @@ +package utils