[agent-go] 初步完成Executor部分的代码

This commit is contained in:
zeaslity
2023-03-24 15:09:22 +08:00
parent fe8a4a03fc
commit 1af25d3992
9 changed files with 328 additions and 31 deletions

View File

@@ -0,0 +1,121 @@
package executor
import (
"agent-go/g"
"agent-go/rabbitmq"
"bufio"
"bytes"
"fmt"
"os/exec"
)
var log = g.G.LOG
func Execute(om *rabbitmq.OctopusMessage, em *rabbitmq.ExecutionMessage) ([]string, error) {
var resultLog []string
var err error
if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 {
// 管道命令
resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand)
} else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 {
// 多行命令
resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand)
} else {
// 单行命令
resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand)
}
// 处理执行日志
// 是否需要返回处理日志,现在默认返回
if em.NeedResultReplay {
// 需要返回处理结果
}
log.Info(fmt.Sprintf("Executor Result: %s", resultLog))
return resultLog, err
}
func PipeLineCommandExecutor(pipeLineCommand [][]string) ([]string, error) {
var cmds []*exec.Cmd
// 创建每个命令对象,并将前一个命令的标准输出连接到当前命令的标准输入
for i, partOfCommand := range pipeLineCommand {
cmd := exec.Command(partOfCommand[0], partOfCommand[1:]...)
if i > 0 {
prevCmd := cmds[i-1]
out, err := prevCmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stdin = out
}
cmds = append(cmds, cmd)
}
// 执行最后一个命令,并获取其输出
lastCmd := cmds[len(cmds)-1]
var out bytes.Buffer
lastCmd.Stdout = &out
lastCmd.Stderr = &out
err := lastCmd.Run()
scanner := bufio.NewScanner(&out)
var result []string
for scanner.Scan() {
result = append(result, scanner.Text())
}
if err != nil {
return nil, err
}
return result, nil
}
func MultiLineCommandExecutor(multiLineCommandExecutor [][]string) ([]string, error) {
var res []string
for _, singleLineCommand := range multiLineCommandExecutor {
singleLogs, err := SingleLineCommandExecutor(singleLineCommand)
res := append(res, singleLogs...)
if err != nil {
return res, err
}
}
return res, nil
}
// SingleLineCommandExecutor 执行单行命令
func SingleLineCommandExecutor(singleLineCommand []string) ([]string, error) {
cmd := exec.Command(singleLineCommand[0], singleLineCommand[1:]...)
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
err := cmd.Run()
scanner := bufio.NewScanner(&out)
var result []string
for scanner.Scan() {
result = append(result, scanner.Text())
}
if err != nil {
return nil, err
}
return result, nil
}

View File

@@ -1,10 +1,14 @@
package g
import "github.com/spf13/viper"
import (
"agent-go/register"
"github.com/spf13/viper"
)
type Global struct {
LOG *Logger
NacosConfig *viper.Viper
LOG *Logger
NacosConfig *viper.Viper
AgentServerInfo *register.AgentServerInfo
}
const (

View File

@@ -0,0 +1,46 @@
package rabbitmq
import (
"agent-go/g"
"fmt"
"github.com/streadway/amqp"
)
// Send 向RabbitMQ中发送消息
func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) {
// 往哪里发
channel := conn.Channel
// 发送
channel.Publish(
connProp.ExchangeName,
connProp.TopicKey,
false,
true,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
}
func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
channel := conn.Channel
// 开始读取队列中的全部消息
msgs, err := channel.Consume(
connProp.QueueName, // 队列名称
g.G.AgentServerInfo.AgentTopicName, // 消费者名称
autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
}
return msgs
}

View File

@@ -1,20 +0,0 @@
package rabbitmq
import "github.com/streadway/amqp"
// Send 向RabbitMQ中发送消息
func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) {
// 往哪里发
channel := conn.Channel
// 发送
channel.Publish(
connProp.ExchangeName,
connProp.TopicKey,
false,
true,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
}

View File

@@ -14,6 +14,16 @@ type OctopusMessage struct {
ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"`
}
type ExecutionMessage struct {
NeedResultReplay bool `json:"needResultReplay"`
DurationTask bool `json:"durationTask,default:false"`
Type string `json:"type"`
SingleLineCommand []string `json:"singleLineCommand"`
MultiLineCommand [][]string `json:"multiLineCommand"`
PipeLineCommand [][]string `json:"pipeLineCommand"`
ResultKey string `json:"resultKey"`
}
// BuildOctopusMsg 生成OctopusMessage
func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage {

View File

@@ -0,0 +1,96 @@
package rabbitmq
import (
"agent-go/executor"
"agent-go/g"
"encoding/json"
"fmt"
)
func HandleOMsg(initOMsgFromServer *OctopusMessage) {
agentTopicName := initOMsgFromServer.UUID
OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{
ExchangeName: OctopusExchange,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusConn, err := NewRabbitMQConn(octopusConnectProp)
if err != nil {
log.Error(fmt.Sprintf("Octopus Message Queue create Error ! => %v", octopusConnectProp))
panic(err)
}
// 开始接收消息
channel := octopusConn.Channel
deliveries, err := channel.Consume(
agentTopicName,
agentTopicName,
true,
false,
false,
false,
nil,
)
if err != nil {
return
}
// 死循环处理Ocotpus Message
for delivery := range deliveries {
var om *OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error("Octopus Message Parse Error !")
// 保存到某处
continue
}
// 策略模式 处理消息
doHandleOctopusMessage(om)
}
}
func doHandleOctopusMessage(octopusMessage *OctopusMessage) {
switch octopusMessage.Type {
case g.InitOmType:
go func() {}()
case g.ExecOmType:
go executorOMHandler(octopusMessage)
case g.StatusOmType:
go statusOMHandler(octopusMessage)
default:
go blackHoleOMHandler(octopusMessage)
}
}
func executorOMHandler(octopusMessage *OctopusMessage) {
executionMsgString := octopusMessage.Content.(string)
var executionMessage *ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil {
return
}
// 交给后端的实际处理器处理, 再次策略
executor.Execute(octopusMessage, executionMessage)
}
func statusOMHandler(octopusMessage *OctopusMessage) {
}
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage))
}

View File

@@ -11,11 +11,12 @@ import (
var log = g.G.LOG
var omType = g.InitOmType
var agentServerInfo = g.G.AgentServerInfo
func INIT() {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo()
g.G.AgentServerInfo = parseAgentServerInfo()
nacosConfig := g.G.NacosConfig
@@ -72,19 +73,58 @@ func INIT() {
// 监听初始化连接中的信息
// 建立运行时RabbitMQ连接
handleInitMsgFromServer()
handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp)
}
func handleInitMsgFromServer() {
// handleInitMsgFromServer 处理从Server接收的注册信息
func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) {
deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false)
// 同步很多抢占注册的情况
for delivery := range deliveries {
var om *rabbitmq.OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
string(delivery.Body)))
}
// 处理OM信息
if om.UUID == g.G.AgentServerInfo.AgentTopicName {
// 是本机的注册回复信息
// 建立运行时RabbitMQ连接
rabbitmq.HandleOMsg(om)
// 手动确认信息
delivery.Ack(false)
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp)
return
}
// 不是自身的 注册回复信息 -- 拒绝
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v", om))
delivery.Nack(false, true)
}
}
func parseAgentServerInfo() AgentServerInfo {
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) {
}
func parseAgentServerInfo() *AgentServerInfo {
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
// 目前使用
var agentServerInfo AgentServerInfo
var agentServerInfo *AgentServerInfo
yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
@@ -97,7 +137,8 @@ func parseAgentServerInfo() AgentServerInfo {
jsonFormat, err := json.Marshal(agentServerInfo)
if err != nil {
return AgentServerInfo{}
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))