[agent-go] 调整代码结构

This commit is contained in:
zeaslity
2023-03-30 14:26:11 +08:00
parent b01eb57ee5
commit 124143c6c6
17 changed files with 622 additions and 594 deletions

View File

@@ -1,11 +0,0 @@
package main
/*type RabbitSendWriter interface {
Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte)
Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery
}
*/

View File

@@ -1,50 +0,0 @@
package config
import (
"agent-go/utils"
"encoding/json"
"fmt"
"time"
)
type OctopusMessage struct {
UUID string `json:"uuid"`
InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"`
Type string `json:"type"`
Content interface{} `json:"content"`
Result interface{} `json:"result"`
ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"`
}
type ExecutionMessage struct {
NeedResultReplay bool `json:"needResultReplay"`
DurationTask bool `json:"durationTask,default:false"`
Type string `json:"type"`
SingleLineCommand []string `json:"singleLineCommand"`
MultiLineCommand [][]string `json:"multiLineCommand"`
PipeLineCommand [][]string `json:"pipeLineCommand"`
ResultKey string `json:"resultKey"`
}
// BuildOctopusMsg 生成OctopusMessage
func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage {
// 当前时间
curTimeString := utils.CurTimeString()
// must write to string format, otherwise it's very hard to deserialize
bytes, err := json.Marshal(content)
if err != nil {
fmt.Sprintf("OctopusMessage Build Error ! %v", err)
}
return &OctopusMessage{
UUID: curTimeString,
InitTime: time.Now(),
Type: omType,
Content: string(bytes),
Result: nil,
ACTime: time.Time{},
}
}

View File

@@ -1,31 +1,43 @@
package executor package executor
import ( import (
"agent-go/config" logger2 "agent-go/logger"
"agent-go/g"
"bufio" "bufio"
"bytes" "bytes"
"fmt" "fmt"
"os/exec" "os/exec"
"time"
) )
var log = g.G.LOG type ExecutionMessage struct {
NeedResultReplay bool `json:"needResultReplay"`
DurationTask bool `json:"durationTask,default:false"`
Type string `json:"type"`
SingleLineCommand []string `json:"singleLineCommand"`
MultiLineCommand [][]string `json:"multiLineCommand"`
PipeLineCommand [][]string `json:"pipeLineCommand"`
ResultKey string `json:"resultKey"`
}
func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string, error) { var log = logger2.Log
func Execute(em *ExecutionMessage) ([]string, error) {
var resultLog []string var resultLog []string
var err error var err error
var realCommand [][]string
if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 { if em.PipeLineCommand != nil && len(em.PipeLineCommand) != 0 {
// 管道命令 // 管道命令
resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand) resultLog, err = PipeLineCommandExecutor(em.PipeLineCommand)
realCommand = em.PipeLineCommand
} else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 { } else if em.MultiLineCommand != nil && len(em.MultiLineCommand) != 0 {
// 多行命令 // 多行命令
resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand) resultLog, err = MultiLineCommandExecutor(em.MultiLineCommand)
realCommand = em.MultiLineCommand
} else { } else {
// 单行命令 // 单行命令
resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand) resultLog, err = SingleLineCommandExecutor(em.SingleLineCommand)
realCommand = [][]string{em.SingleLineCommand}
} }
// 归一化错误和日志 // 归一化错误和日志
@@ -33,15 +45,9 @@ func Execute(om *config.OctopusMessage, em *config.ExecutionMessage) ([]string,
resultLog = append(resultLog, fmt.Sprintf("Error: %s", err.Error())) resultLog = append(resultLog, fmt.Sprintf("Error: %s", err.Error()))
} }
// 处理执行日志 commandResult := fmt.Sprintf("Excution Comand are=> %v, Executor Result: %v", realCommand, resultLog)
// 是否需要返回处理日志,现在默认返回
if em.NeedResultReplay {
// 需要返回处理结果
om.ACTime = time.Now()
om.Result = resultLog
}
log.Info(fmt.Sprintf("Executor Result: %s", resultLog)) log.Info(commandResult)
return resultLog, err return resultLog, err
} }

View File

@@ -1 +0,0 @@
package g

View File

@@ -1,16 +1,15 @@
package g package g
import ( import (
"agent-go/config" logger2 "agent-go/logger"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type Global struct { type Global struct {
LOG *Logger AgentHasRegister bool
NacosConfig *viper.Viper NacosConfig *viper.Viper
AgentServerInfo *config.AgentServerInfo P *ants.Pool
P *ants.Pool
} }
const ( const (
@@ -21,21 +20,17 @@ const (
InitOmType = "INIT" InitOmType = "INIT"
) )
var logger, _ = NewLogger() var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger2.Log))
var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger))
var G = NewGlobal( var G = NewGlobal(
logger,
pool, pool,
) )
// NewGlobal NewGlobal构造函数返回一个新的Global实例其中包含指定的Logger。 // NewGlobal NewGlobal构造函数返回一个新的Global实例其中包含指定的Logger。
func NewGlobal(logger *Logger, pool *ants.Pool) *Global { func NewGlobal(pool *ants.Pool) *Global {
return &Global{ return &Global{
LOG: logger, AgentHasRegister: false,
NacosConfig: nil, NacosConfig: nil,
AgentServerInfo: nil, P: pool,
P: pool,
} }
} }

View File

@@ -1,4 +1,4 @@
package g package logger
import ( import (
"fmt" "fmt"
@@ -11,6 +11,8 @@ type Logger struct {
*zap.Logger *zap.Logger
} }
var Log, _ = NewLogger()
// NewLogger creates a new Logger instance. // NewLogger creates a new Logger instance.
func NewLogger() (*Logger, error) { func NewLogger() (*Logger, error) {
config := zap.Config{ config := zap.Config{

View File

@@ -2,12 +2,13 @@ package main
import ( import (
"agent-go/g" "agent-go/g"
logger2 "agent-go/logger"
"agent-go/register" "agent-go/register"
"flag" "flag"
"fmt" "fmt"
) )
var log = g.G.LOG var log = logger2.Log
func main() { func main() {
@@ -20,9 +21,9 @@ func main() {
println(filename) println(filename)
// 初始化Nacos的连接配置 // 初始化Nacos的连接配置
g.G.NacosConfig = g.InitNacos(filename) g.G.NacosConfig = register.InitNacos(filename)
// 执行初始化之策工作 // 执行初始化之策工作
g.G.AgentServerInfo = register.INIT() register.AgentServerInfoCache = register.INIT()
} }

View File

@@ -1,63 +0,0 @@
package rabbitmq
import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/common/logger"
"github.com/streadway/amqp"
)
// RabbitMQConn is a struct that holds the connection and channel objects
type RabbitMQConn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
type ConnectProperty struct {
ExchangeName string
QueueName string
ExchangeType string
TopicKey string
}
// Send 向RabbitMQ中发送消息
func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) {
// 往哪里发
channel := conn.Channel
// 发送
err := channel.Publish(
connProp.ExchangeName,
connProp.TopicKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
logger.Error(fmt.Sprintf("Failed to publish a message: %v", err))
}
}
func Read(conn *RabbitMQConn, connProp *ConnectProperty, autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
channel := conn.Channel
// 开始读取队列中的全部消息
msgs, err := channel.Consume(
connProp.QueueName, // 队列名称
"", // 消费者名称
autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
}
return msgs
}

View File

@@ -0,0 +1,72 @@
package rabbitmq
import (
"agent-go/g"
"encoding/json"
"fmt"
)
var OctopusToServerQueue = &RabbitQueue{}
func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
// 建立 业务消息 接收队列
// agentTopicName为名称的队列
nacosConfig := g.G.NacosConfig
octopusExchangeName := nacosConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusMsgQueue := &RabbitQueue{
RabbitProp: octopusConnectProp,
}
octopusMsgQueue.Connect()
deliveries := octopusMsgQueue.Read(true)
// 死循环处理Octopus Message
P.Submit(
func() {
for delivery := range deliveries {
var om *OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处
continue
}
// 策略模式 处理消息
om.Handle()
}
})
// 建立 业务消息 返回队列
// 统一为 OctopusToServer
octopusToServerQueueName := nacosConfig.GetString("octopus.message.octopus_to_server")
octopusToServerProp := &ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: octopusToServerQueueName,
ExchangeType: g.QueueTopic,
TopicKey: octopusToServerQueueName,
}
OctopusToServerQueue = &RabbitQueue{
RabbitProp: octopusToServerProp,
}
// 开启运行时消息返回队列
OctopusToServerQueue.Connect()
log.InfoF("Octopus Message Replay Queue is established ! => %v", OctopusToServerQueue)
}

View File

@@ -0,0 +1,113 @@
package rabbitmq
import (
"agent-go/executor"
"agent-go/g"
"agent-go/utils"
"encoding/json"
"fmt"
"time"
)
var P = g.G.P
type IOctopusMessage interface {
OctopusMsgHandler
OctopusMsgSender
OctopusMsgBuilder
}
type OctopusMsgHandler interface {
Handle(octopusMessage *OctopusMessage)
}
type OctopusMsgSender interface {
Send(rabbitQueue *RabbitQueue, msg []byte)
}
type OctopusMsgBuilder interface {
Build(omType string, content interface{}) *OctopusMessage
}
type OctopusMessage struct {
UUID string `json:"uuid"`
InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"`
Type string `json:"type"`
Content interface{} `json:"content"`
Result interface{} `json:"result"`
ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"`
}
func (om *OctopusMessage) Handle() {
// 实际执行 OM handle进程
doHandleOctopusMessage(om)
}
func (om *OctopusMessage) Send(rabbitQueue *RabbitQueue, msg []byte) {
rabbitQueue.Send(msg)
}
func (om *OctopusMessage) Build(omType string, content interface{}) *OctopusMessage {
// 当前时间
curTimeString := utils.CurTimeString()
// must write to string format, otherwise it's very hard to deserialize
bytes, err := json.Marshal(content)
if err != nil {
fmt.Sprintf("OctopusMessage Build Error ! %v", err)
}
return &OctopusMessage{
UUID: curTimeString,
InitTime: time.Now(),
Type: omType,
Content: string(bytes),
Result: nil,
ACTime: time.Time{},
}
}
func doHandleOctopusMessage(octopusMessage *OctopusMessage) {
switch octopusMessage.Type {
case g.InitOmType:
go func() {}()
case g.ExecOmType:
P.Submit(func() {
executorOMHandler(octopusMessage)
})
case g.StatusOmType:
P.Submit(func() {
statusOMHandler(octopusMessage)
})
default:
P.Submit(func() {
blackHoleOMHandler(octopusMessage)
})
}
}
func executorOMHandler(octopusMessage *OctopusMessage) {
executionMsgString := octopusMessage.Content.(string)
var executionMessage *executor.ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil {
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
return
}
// 交给后端的实际处理器处理, 再次策略
executor.Execute(executionMessage)
}
func statusOMHandler(octopusMessage *OctopusMessage) {
}
func blackHoleOMHandler(octopusMessage *OctopusMessage) {
log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage))
}

View File

@@ -1,106 +0,0 @@
package rabbitmq
import (
"agent-go/config"
"agent-go/executor"
"agent-go/g"
"encoding/json"
"fmt"
)
var P = g.G.P
func HandleOMsg(initOMsgFromServer *config.OctopusMessage) {
agentTopicName := initOMsgFromServer.Result.(string)
OctopusExchange := g.G.NacosConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &ConnectProperty{
ExchangeName: OctopusExchange,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusConn, err := NewRabbitMQConn(octopusConnectProp)
if err != nil {
log.Error(fmt.Sprintf("Octopus Message Queue create Error ! => %v", octopusConnectProp))
panic(err)
}
// 开始接收消息
channel := octopusConn.Channel
deliveries, err := channel.Consume(
agentTopicName,
agentTopicName,
true,
false,
false,
false,
nil,
)
if err != nil {
return
}
// 死循环处理Ocotpus Message
for delivery := range deliveries {
var om *config.OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
// 保存到某处
continue
}
// 策略模式 处理消息
doHandleOctopusMessage(om)
}
}
func doHandleOctopusMessage(octopusMessage *config.OctopusMessage) {
switch octopusMessage.Type {
case g.InitOmType:
go func() {}()
case g.ExecOmType:
P.Submit(func() {
executorOMHandler(octopusMessage)
})
case g.StatusOmType:
P.Submit(func() {
statusOMHandler(octopusMessage)
})
default:
P.Submit(func() {
blackHoleOMHandler(octopusMessage)
})
}
}
func executorOMHandler(octopusMessage *config.OctopusMessage) {
executionMsgString := octopusMessage.Content.(string)
var executionMessage *config.ExecutionMessage
err := json.Unmarshal([]byte(executionMsgString), &executionMessage)
if err != nil {
log.Error(fmt.Sprintf("execution message convert to json is wrong! msg is => %s", executionMsgString))
return
}
// 交给后端的实际处理器处理, 再次策略
executor.Execute(octopusMessage, executionMessage)
}
func statusOMHandler(octopusMessage *config.OctopusMessage) {
}
func blackHoleOMHandler(octopusMessage *config.OctopusMessage) {
log.Error(fmt.Sprintf("octopusMessage type wrong! msg is => %v", octopusMessage))
}

View File

@@ -1,154 +0,0 @@
package rabbitmq
import (
"agent-go/g"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
)
var log = g.G.LOG
// 定义全局唯一的 Singleton 实例
var instance *amqp.Connection
// 用 sync.Once 变量确保初始化函数只会被调用一次
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance() {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpointFromG)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance() *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
once.Do(createInstance)
return instance
}
// NewRabbitMQConn creates a new RabbitMQ connection object
func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) {
// 获取RabbitMQ的连接
conn := GetInstance()
// 获取RabbitMQ的连接地址
//rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
//conn, err := amqp.Dial(rabbitMQEndpointFromG)
//if err != nil {
// log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
//}
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ channel: %w", err)
}
if err = ch.ExchangeDeclare(
property.ExchangeName, // name of the exchange
property.ExchangeType, // type of the exchange
false, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("failed to declare RabbitMQ exchange: %w", err)
}
_, err = ch.QueueDeclare(
property.QueueName, // name of the queue
false, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("failed to declare RabbitMQ queue: %w", err)
}
if err = ch.QueueBind(
property.QueueName, // name of the queue
property.TopicKey, // routing key - all topics
property.ExchangeName, // name of the exchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("failed to bind RabbitMQ queue: %w", err)
}
return &RabbitMQConn{Connection: conn, Channel: ch}, nil
}
// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string {
nacosConfig := g.G.NacosConfig
var res strings.Builder
host := nacosConfig.GetString("spring.rabbitmq.host")
port := nacosConfig.GetString("spring.rabbitmq.port")
username := nacosConfig.GetString("spring.rabbitmq.username")
password := nacosConfig.GetString("spring.rabbitmq.password")
virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(username)
res.WriteString(":")
res.WriteString(password)
res.WriteString("@")
res.WriteString(host)
res.WriteString(":")
res.WriteString(port)
res.WriteString("/")
res.WriteString(virtualHost)
s := res.String()
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
return s
}
func CloseChannel(conn *RabbitMQConn) error {
var err error
if conn.Channel != nil {
if err = conn.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
return err
}
// CloseRabbitMQAll closes the RabbitMQ connection and channel
func (r *RabbitMQConn) CloseRabbitMQAll() error {
var err error
if r.Channel != nil {
if err = r.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
if r.Connection != nil {
if err = r.Connection.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ connection: %v", err))
}
}
return err
}

View File

@@ -0,0 +1,214 @@
package rabbitmq
import (
"agent-go/g"
logger2 "agent-go/logger"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
)
type RabbitMQ interface {
RabbitSendWriter
RabbitConnectCloser
}
type RabbitSendWriter interface {
Send(message []byte)
Read(autoAck bool) <-chan amqp.Delivery
}
type RabbitConnectCloser interface {
Connect()
Close() error
}
type RabbitQueue struct {
RabbitConn *RabbitMQConn
RabbitProp *ConnectProperty
}
// RabbitMQConn is a struct that holds the connection and channel objects
type RabbitMQConn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
type ConnectProperty struct {
ExchangeName string
QueueName string
ExchangeType string
TopicKey string
}
var log = logger2.Log
// 定义全局唯一的 Singleton 实例
var instance *amqp.Connection
// 用 sync.Once 变量确保初始化函数只会被调用一次
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance() {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpointFromG)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance() *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
once.Do(createInstance)
return instance
}
// Connect creates a new RabbitMQ connection object
func (r *RabbitQueue) Connect() {
// 获取RabbitMQ的连接
conn := GetInstance()
ch, err := conn.Channel()
if err != nil {
log.Error(fmt.Sprintf("failed to create RabbitMQ channel: %w", err))
}
if err = ch.ExchangeDeclare(
r.RabbitProp.ExchangeName, // name of the exchange
r.RabbitProp.ExchangeType, // type of the exchange
false, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to declare exchange !: %w", err))
}
_, err = ch.QueueDeclare(
r.RabbitProp.QueueName, // name of the queue
false, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("failed to declare RabbitMQ queue: %w", err))
}
if err = ch.QueueBind(
r.RabbitProp.QueueName, // name of the queue
r.RabbitProp.TopicKey, // routing key - all topics
r.RabbitProp.ExchangeName, // name of the exchange
false, // noWait
nil, // arguments
); err != nil {
log.Error(fmt.Sprintf("failed to bind RabbitMQ queue: %w", err))
}
r.RabbitConn = &RabbitMQConn{
Connection: conn,
Channel: ch,
}
}
func (r *RabbitQueue) Close() error {
var err error
if r.RabbitConn.Channel != nil {
if err = r.RabbitConn.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
return err
}
// Send 向RabbitMQ中发送消息
func (r *RabbitQueue) Send(message []byte) {
// 往哪里发
channel := r.RabbitConn.Channel
// 发送
err := channel.Publish(
r.RabbitProp.ExchangeName,
r.RabbitProp.TopicKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
log.Error(fmt.Sprintf("Failed to publish a message: %v", err))
}
}
func (r *RabbitQueue) Read(autoAck bool) <-chan amqp.Delivery {
// 拿到特定的Channel
channel := r.RabbitConn.Channel
// 开始读取队列中的全部消息
msgs, err := channel.Consume(
r.RabbitProp.QueueName, // 队列名称
"", // 消费者名称
autoAck, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error(fmt.Sprintf("Failed to register a consumer: %v", err))
}
return msgs
}
// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string {
nacosConfig := g.G.NacosConfig
var res strings.Builder
host := nacosConfig.GetString("spring.rabbitmq.host")
port := nacosConfig.GetString("spring.rabbitmq.port")
username := nacosConfig.GetString("spring.rabbitmq.username")
password := nacosConfig.GetString("spring.rabbitmq.password")
virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(username)
res.WriteString(":")
res.WriteString(password)
res.WriteString("@")
res.WriteString(host)
res.WriteString(":")
res.WriteString(port)
res.WriteString("/")
res.WriteString(virtualHost)
s := res.String()
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
return s
}

View File

@@ -0,0 +1,186 @@
package register
import (
"agent-go/g"
logger2 "agent-go/logger"
"agent-go/rabbitmq"
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"time"
)
var omType = g.InitOmType
var log = logger2.Log
var P = g.G.P
var AgentServerInfoCache = &AgentServerInfo{}
func INIT() *AgentServerInfo {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo()
nacosConfig := g.G.NacosConfig
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
}
defer initToServerQueue.Close()
// 建立连接
initToServerQueue.Connect()
// 组装OctopusMessage
var octopusMsg *rabbitmq.OctopusMessage
octopusMsg = octopusMsg.Build(
omType,
agentServerInfo,
)
msgBytes, err := json.Marshal(octopusMsg)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
}
log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes)))
// 发送OM至MQ中
P.Submit(
func() {
for g.G.AgentHasRegister == false {
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
initToServerQueue.Send(
msgBytes,
)
// 休眠
time.Sleep(10 * time.Minute)
}
})
// 监听初始化连接中的信息
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
}
defer initFromServerQueue.Close()
// 建立连接
initFromServerQueue.Connect()
// 建立运行时RabbitMQ连接
handleInitMsgFromServer(initFromServerQueue, initToServerQueue, agentServerInfo)
return agentServerInfo
}
// handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *AgentServerInfo) {
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
forever := make(chan bool)
// use the ant goroutine pool
P.Submit(
func() {
// 同步很多抢占注册的情况
for delivery := range initOctopusMessageDeliveries {
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
var initOctopusMsg *rabbitmq.OctopusMessage
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
if err != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
string(delivery.Body)))
}
var serverInfo AgentServerInfo
s, _ := initOctopusMsg.Content.(string)
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
// 是本机的注册回复信息
// 建立 运行时 RabbitMQ连接
agentTopicName := initOctopusMsg.Result.(string)
rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName)
// 手动确认信息
delivery.Ack(false)
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
return
}
// 不是自身的 注册回复信息 -- 拒绝
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body))
delivery.Nack(false, true)
}
},
)
// wait forever
<-forever
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
}
func parseAgentServerInfo() *AgentServerInfo {
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
// 目前使用
var agentServerInfo *AgentServerInfo
//yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
yamlFile, err := ioutil.ReadFile("server-env.yaml")
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
jsonFormat, err := json.Marshal(agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}

View File

@@ -1,175 +0,0 @@
package register
import (
"agent-go/config"
"agent-go/g"
"agent-go/rabbitmq"
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
)
var omType = g.InitOmType
var log = g.G.LOG
func INIT() *config.AgentServerInfo {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo()
nacosConfig := g.G.NacosConfig
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
// defer 关闭初始化连接
initToServer, err := rabbitmq.NewRabbitMQConn(
initToServerProp,
)
if err != nil {
log.Error("init to server queue established error!")
panic(err)
}
//defer rabbitmq.CloseChannel(initToServer)
//defer rabbitmq.CloseChannel(initFromServer)
// 组装OctopusMessage
var octopusMsg *config.OctopusMessage
octopusMsg = octopusMsg.BuildOctopusMsg(
omType,
agentServerInfo,
)
msgBytes, err := json.Marshal(octopusMsg)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
}
log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes)))
// 发送OM至MQ中
rabbitmq.Send(
initToServer,
initToServerProp,
msgBytes,
)
// 监听初始化连接中的信息
initFromServer, err := rabbitmq.NewRabbitMQConn(
initFromServerProp,
)
if err != nil {
log.Error("init from server queue established error!")
panic(err)
}
// 建立运行时RabbitMQ连接
handleInitMsgFromServer(initFromServer, initFromServerProp, initToServer, initToServerProp, agentServerInfo)
return agentServerInfo
}
// handleInitMsgFromServer 处理从Server接收的注册信息
func handleInitMsgFromServer(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty, agentServerInfo *config.AgentServerInfo) {
deliveries := rabbitmq.Read(initFromServer, initFromServerProp, false)
forever := make(chan bool)
go func() {
// 同步很多抢占注册的情况
for delivery := range deliveries {
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
var om *config.OctopusMessage
err := json.Unmarshal(delivery.Body, &om)
if err != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
string(delivery.Body)))
}
var serverInfo config.AgentServerInfo
s, _ := om.Content.(string)
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if om != nil && om.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
// 是本机的注册回复信息
// 建立运行时RabbitMQ连接
// change to async
go rabbitmq.HandleOMsg(om)
// 手动确认信息
delivery.Ack(false)
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServer, initFromServerProp, initToServer, initToServerProp)
return
}
// 不是自身的 注册回复信息 -- 拒绝
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", om, delivery.Body))
delivery.Nack(false, true)
}
}()
// wait forever
<-forever
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServer *rabbitmq.RabbitMQConn, initFromServerProp *rabbitmq.ConnectProperty, initToServer *rabbitmq.RabbitMQConn, initToServerProp *rabbitmq.ConnectProperty) {
}
func parseAgentServerInfo() *config.AgentServerInfo {
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
// 目前使用
var agentServerInfo *config.AgentServerInfo
//yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
yamlFile, err := ioutil.ReadFile("server-env.yaml")
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
jsonFormat, err := json.Marshal(agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}

View File

@@ -1,4 +1,4 @@
package config package register
type AgentServerInfo struct { type AgentServerInfo struct {
ServerName string `json:"serverName" yaml:"serverName"` ServerName string `json:"serverName" yaml:"serverName"`

View File

@@ -1,4 +1,4 @@
package g package register
import ( import (
"bytes" "bytes"
@@ -13,7 +13,6 @@ import (
"strings" "strings"
) )
var log = G.LOG
var group = "" var group = ""
func InitNacos(configFileName string) *viper.Viper { func InitNacos(configFileName string) *viper.Viper {