[agent-go] 完成RabbitMQ连接部分的代码

This commit is contained in:
zeaslity
2023-03-21 17:08:22 +08:00
parent 1c57a631d9
commit fe8a4a03fc
19 changed files with 482 additions and 93 deletions

View File

@@ -2,78 +2,38 @@ package config
import (
"agent-go/g"
"bytes"
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/spf13/viper"
"go.uber.org/zap"
"strconv"
"strings"
)
type OctopusAgentNacosConfig struct {
Spring Spring `json:"spring"`
Server Server `json:"server"`
}
type Server struct {
Port int64 `json:"port"`
}
type Spring struct {
Application Application `json:"application"`
Profiles Profiles `json:"profiles"`
Cloud Cloud `json:"cloud"`
}
type Application struct {
Name string `json:"name"`
}
type Cloud struct {
Nacos Nacos `json:"nacos"`
}
type Nacos struct {
Config Config `json:"config"`
}
type Config struct {
Group string `json:"group"`
ConfigRetryTime int64 `json:"config-retry-time"`
FileExtension string `json:"file-extension"`
MaxRetry int64 `json:"max-retry"`
ServerAddr string `json:"server-addr"`
Timeout uint64 `json:"timeout"`
ConfigLongPollTimeout int64 `json:"config-long-poll-timeout"`
ExtensionConfigs []ExtensionConfig `json:"extension-configs"`
}
type ExtensionConfig struct {
Group string `json:"group"`
DataID string `json:"data-id"`
}
type Profiles struct {
Active string `json:"active"`
}
var log = g.G.LOG
var group = ""
func InitNacos(configFileName string) {
v := parseAgentConfigFile(configFileName, nil)
group := v.GetString("spring.cloud.nacos.config.group")
group = v.GetString("spring.cloud.nacos.config.group")
configClient := startNacosConnection(v, group)
configClient := startNacosConnection(v)
allNacosConfig := getAllNacosConfig(v, group, configClient)
for _, nacosConfig := range allNacosConfig {
parseAgentConfigFile(nacosConfig, v)
for _, nacosConfigContent := range allNacosConfig {
log.Debug(fmt.Sprintf("nacos config conetent is %s", nacosConfigContent))
//parseNacosConfigContend(nacosConfigContent, v)
}
log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings()))
}
func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper {
@@ -94,19 +54,31 @@ func parseAgentConfigFile(configFileName string, v *viper.Viper) *viper.Viper {
panic(fmt.Errorf("fatal error config file: %s", err))
}
log.Info(fmt.Sprintf("%s config read result are %v", configFileName, v.AllSettings()))
return v
}
func startNacosConnection(v *viper.Viper, group string) config_client.IConfigClient {
func parseNacosConfigContend(configContent string, v *viper.Viper) *viper.Viper {
v.SetConfigType("yaml")
err := v.ReadConfig(bytes.NewBuffer([]byte(configContent)))
if err != nil {
log.Error("nacos config contend read error !", zap.Error(err))
}
return v
}
func startNacosConnection(v *viper.Viper) config_client.IConfigClient {
clientConfig := constant.ClientConfig{
NamespaceId: group,
NamespaceId: "public",
TimeoutMs: v.GetUint64("spring.cloud.nacos.config.timeout"),
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
AppendToStdout: true,
//UpdateCacheWhenEmpty: true,
//LogDir: "/tmp/nacos/log",
//CacheDir: "/tmp/nacos/cache",
Username: "nacos",
Password: "Superwmm.23",
}
serverAddr := v.GetString("spring.cloud.nacos.config.server-addr")
@@ -120,6 +92,7 @@ func startNacosConnection(v *viper.Viper, group string) config_client.IConfigCli
{
IpAddr: split[0],
Port: port,
GrpcPort: port + 1000,
},
}
@@ -142,39 +115,51 @@ func getAllNacosConfig(v *viper.Viper, group string, configClient config_client.
result := make([]string, 0)
// main nacos configs
mainNacosConfigFileName := v.GetString("spring.application.name") + v.GetString("spring.profiles.active") + v.GetString("spring.cloud.nacos.config.file-extension")
getConfig(mainNacosConfigFileName, group, configClient)
result = append(result, mainNacosConfigFileName)
mainNacosConfigFileName := v.GetString("spring.application.name") + "-" + v.GetString("spring.profiles.active") + "." + v.GetString("spring.cloud.nacos.config.file-extension")
log.Debug(fmt.Sprintf("main nacos config file name is %s", mainNacosConfigFileName))
configContent := getConfig(mainNacosConfigFileName, group, configClient)
result = append(result, configContent)
// additional nacos config
additionalNacosConfig := v.Get("spring.cloud.nacos.config.extension-configs")
// 增加断言判定map的类型
m, ok := additionalNacosConfig.(map[string]string)
m, ok := additionalNacosConfig.([]interface{})
if !ok {
fmt.Println("additionalNacosConfig is not a map")
fmt.Println("additionalNacosConfig is not a slice")
return nil
}
for additionalNacosConfigFileName, additionalNacosConfigGroup := range m {
getConfig(additionalNacosConfigFileName, additionalNacosConfigGroup, configClient)
result = append(result, additionalNacosConfigFileName)
for _, addConfigMap := range m {
real, _ := addConfigMap.(map[string]interface{})
for additionalNacosConfigFileName, additionalNacosConfigGroup := range real {
s := additionalNacosConfigGroup.(string)
configContent := getConfig(additionalNacosConfigFileName, s, configClient)
result = append(result, configContent)
}
}
return result
}
// getConfig 从Nacos中获取相应的
func getConfig(dataId string, group string, configClient config_client.IConfigClient) string {
content, err := configClient.GetConfig(vo.ConfigParam{
log.Debug(fmt.Sprintf("nacos config get method dataID is %s, group is %s", dataId, group))
content := ""
configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
Content: content,
Type: "yaml",
})
if err != nil {
panic(err)
}
log.Debug(fmt.Sprintf("dataId %s , group %s, nacos config content is %s", dataId, group, content))
return content
}

1
agent-go/g/Nacos.go Normal file
View File

@@ -0,0 +1 @@
package g

16
agent-go/g/TimeUtils.go Normal file
View File

@@ -0,0 +1,16 @@
package g
import (
"time"
)
// CurTimeString 输出系统时间的格式为"2006-01-02 15:04:05"形式的时间字符串
func CurTimeString() string {
now := time.Now()
/*loc := time.FixedZone("UTC+8", 8*60*60) // 创建东八区时区对象
localTime := now.In(loc) // 转换为东八区时间*/
return now.Format("2006-01-02 15:04:05")
}

View File

@@ -1,9 +1,20 @@
package g
import "github.com/spf13/viper"
type Global struct {
LOG *Logger
NacosConfig *viper.Viper
}
const (
QueueDirect = "direct"
QueueTopic = "topic"
ExecOmType = "EXECUTOR"
StatusOmType = "STATUS"
InitOmType = "INIT"
)
var logger, _ = NewLogger()
var G = NewGlobal(

View File

@@ -14,7 +14,7 @@ type Logger struct {
func NewLogger() (*Logger, error) {
config := zap.Config{
Encoding: "json",
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Level: zap.NewAtomicLevelAt(zap.DebugLevel),
OutputPaths: []string{"stdout"}, // 输出到控制台
ErrorOutputPaths: []string{"stderr"},
EncoderConfig: zapcore.EncoderConfig{

View File

@@ -5,8 +5,8 @@ go 1.18
require (
github.com/nacos-group/nacos-sdk-go/v2 v2.2.0
github.com/spf13/viper v1.15.0
github.com/streadway/amqp v1.0.0
go.uber.org/zap v1.24.0
gopkg.in/yaml.v3 v3.0.1
)
require (
@@ -48,4 +48,5 @@ require (
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -253,6 +253,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

View File

@@ -1,7 +1,7 @@
package main
import (
"agent-go/config"
"agent-go/register"
"flag"
"fmt"
)
@@ -14,7 +14,12 @@ func main() {
flag.Parse()
// 读取对应版本的配置文件
filename := fmt.Sprintf("octopus-agent-%s.yaml", version)
println(filename)
// 初始化Nacos的连接配置
config.InitNacos(filename)
//config.InitNacos(filename)
// 执行初始化之策工作
register.INIT()
}

View File

@@ -0,0 +1,20 @@
package rabbitmq
import "github.com/streadway/amqp"
// Send 向RabbitMQ中发送消息
func Send(conn *RabbitMQConn, connProp *ConnectProperty, message []byte) {
// 往哪里发
channel := conn.Channel
// 发送
channel.Publish(
connProp.ExchangeName,
connProp.TopicKey,
false,
true,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
}

View File

@@ -0,0 +1,31 @@
package rabbitmq
import (
"agent-go/g"
"time"
)
type OctopusMessage struct {
UUID string `json:"uuid"`
InitTime time.Time `json:"init_time" format:"2023-03-21 16:38:30"`
Type string `json:"type"`
Content interface{} `json:"content"`
Result interface{} `json:"result"`
ACTime time.Time `json:"ac_time" format:"2023-03-21 16:38:30"`
}
// BuildOctopusMsg 生成OctopusMessage
func (m *OctopusMessage) BuildOctopusMsg(omType string, content interface{}) *OctopusMessage {
// 当前时间
curTimeString := g.CurTimeString()
return &OctopusMessage{
UUID: curTimeString,
InitTime: time.Now(),
Type: omType,
Content: content,
Result: nil,
ACTime: time.Time{},
}
}

View File

@@ -0,0 +1,160 @@
package rabbitmq
import (
"agent-go/g"
"fmt"
"github.com/streadway/amqp"
"strings"
"sync"
)
var log = g.G.LOG
// RabbitMQConn is a struct that holds the connection and channel objects
type RabbitMQConn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
type ConnectProperty struct {
ExchangeName string
QueueName string
ExchangeType string
TopicKey string
}
// 定义全局唯一的 Singleton 实例
var instance *amqp.Connection
// 用 sync.Once 变量确保初始化函数只会被调用一次
var once sync.Once
// 初始化 Singleton 实例的函数
func createInstance() {
// 在这里进行 Singleton 的初始化操作
// 获取RabbitMQ的连接地址
rabbitMQEndpointFromG := parseRabbitMQEndpointFromG()
// 创建全局唯一连接 RabbitMQ连接
connection, err := amqp.Dial(rabbitMQEndpointFromG)
if err != nil {
log.Error(fmt.Sprintf("failed to connect to RabbitMQ: %v", err))
}
instance = connection
}
// GetInstance 获取全局唯一的 Singleton 实例的函数
func GetInstance() *amqp.Connection {
// 使用 sync.Once 确保 createInstance 只会被调用一次
once.Do(createInstance)
return instance
}
// NewRabbitMQConn creates a new RabbitMQ connection object
func NewRabbitMQConn(property *ConnectProperty) (*RabbitMQConn, error) {
// 获取RabbitMQ的连接
conn := GetInstance()
ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ channel: %w", err)
}
if err = ch.ExchangeDeclare(
property.ExchangeName, // name of the exchange
property.ExchangeType, // type of the exchange
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("failed to declare RabbitMQ exchange: %w", err)
}
_, err = ch.QueueDeclare(
property.QueueName, // name of the queue
true, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("failed to declare RabbitMQ queue: %w", err)
}
if err = ch.QueueBind(
property.QueueName, // name of the queue
property.TopicKey, // routing key - all topics
property.ExchangeName, // name of the exchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("failed to bind RabbitMQ queue: %w", err)
}
return &RabbitMQConn{Connection: conn, Channel: ch}, nil
}
// parseRabbitMQEndpoint 根据全局变量NacosConfig解析出RabbitMQ的连接地址
func parseRabbitMQEndpointFromG() string {
nacosConfig := g.G.NacosConfig
var res strings.Builder
host := nacosConfig.GetString("spring.rabbitmq.host")
port := nacosConfig.GetString("spring.rabbitmq.port")
username := nacosConfig.GetString("spring.rabbitmq.username")
password := nacosConfig.GetString("spring.rabbitmq.password")
virtualHost := nacosConfig.GetString("spring.rabbitmq.virtual-host")
// amqp://{username}:{password}@{hostname}:{port}/{virtual_host}
res.WriteString("amqp://")
res.WriteString(username)
res.WriteString(":")
res.WriteString(password)
res.WriteString("@")
res.WriteString(host)
res.WriteString(":")
res.WriteString(port)
res.WriteString(virtualHost)
s := res.String()
log.Debug(fmt.Sprintf("generate RabbitMQ endpoint is %s", s))
return s
}
func CloseChannel(conn *RabbitMQConn) error {
var err error
if conn.Channel != nil {
if err = conn.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
return err
}
// CloseRabbitMQAll closes the RabbitMQ connection and channel
func (r *RabbitMQConn) CloseRabbitMQAll() error {
var err error
if r.Channel != nil {
if err = r.Channel.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ channel: %v", err))
}
}
if r.Connection != nil {
if err = r.Connection.Close(); err != nil {
log.Error(fmt.Sprintf("Failed to close RabbitMQ connection: %v", err))
}
}
return err
}

View File

@@ -0,0 +1,105 @@
package register
import (
"agent-go/g"
"agent-go/rabbitmq"
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
)
var log = g.G.LOG
var omType = g.InitOmType
func INIT() {
// 获取系统的环境变量
agentServerInfo := parseAgentServerInfo()
nacosConfig := g.G.NacosConfig
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: nacosConfig.GetString("octopus.message.init_exchange"),
QueueName: nacosConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: nacosConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
// defer 关闭初始化连接
initToServer, err := rabbitmq.NewRabbitMQConn(
initToServerProp,
)
if err != nil {
log.Error("init to server queue established error!")
panic(err)
}
defer rabbitmq.CloseChannel(initToServer)
initFromServer, err := rabbitmq.NewRabbitMQConn(
initFromServerProp,
)
if err != nil {
log.Error("init from server queue established error!")
panic(err)
}
defer rabbitmq.CloseChannel(initFromServer)
// 组装OctopusMessage
var octopusMsg *rabbitmq.OctopusMessage
octopusMsg = octopusMsg.BuildOctopusMsg(
omType,
agentServerInfo,
)
msgBytes, err := json.Marshal(octopusMsg)
if err != nil {
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
}
// 发送OM至MQ中O
rabbitmq.Send(
initToServer,
initToServerProp,
msgBytes,
)
// 监听初始化连接中的信息
// 建立运行时RabbitMQ连接
handleInitMsgFromServer()
}
func handleInitMsgFromServer() {
}
func parseAgentServerInfo() AgentServerInfo {
// 约定文件地址为 /etc/environment.d/octopus-agent.conf
// 目前使用
var agentServerInfo AgentServerInfo
yamlFile, err := ioutil.ReadFile("C:\\Users\\wdd\\IdeaProjects\\ProjectOctopus\\agent-go\\server-env.yaml")
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
jsonFormat, err := json.Marshal(agentServerInfo)
if err != nil {
return AgentServerInfo{}
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}

View File

@@ -0,0 +1,26 @@
package register
type AgentServerInfo struct {
ServerName string `json:"serverName" yaml:"serverName"`
ServerIPPbV4 string `json:"serverIpPbV4" yaml:"serverIpPbV4"`
ServerIPInV4 string `json:"serverIpInV4" yaml:"serverIpInV4"`
ServerIPPbV6 string `json:"serverIpPbV6" yaml:"serverIpPbV6"`
ServerIPInV6 string `json:"serverIpInV6" yaml:"serverIpInV6"`
Location string `json:"location" yaml:"location"`
Provider string `json:"provider" yaml:"provider"`
ManagePort string `json:"managePort" yaml:"managePort"`
CPUCore string `json:"cpuCore" yaml:"cpuCore"`
CPUBrand string `json:"cpuBrand" yaml:"cpuBrand"`
OSInfo string `json:"osInfo" yaml:"osInfo"`
OSKernelInfo string `json:"osKernelInfo" yaml:"osKernelInfo"`
TCPControl string `json:"tcpControl" yaml:"tcpControl"`
Virtualization string `json:"virtualization" yaml:"virtualization"`
IoSpeed string `json:"ioSpeed" yaml:"ioSpeed"`
MemoryTotal string `json:"memoryTotal" yaml:"memoryTotal"`
DiskTotal string `json:"diskTotal" yaml:"diskTotal"`
DiskUsage string `json:"diskUsage" yaml:"diskUsage"`
Comment string `json:"comment" yaml:"comment"`
MachineID string `json:"machineId" yaml:"machineId"`
AgentVersion string `json:"agentVersion" yaml:"agentVersion"`
AgentTopicName string `json:"agentTopicName" yaml:"agentTopicName"`
}

22
agent-go/server-env.yaml Normal file
View File

@@ -0,0 +1,22 @@
serverName: "Chengdu-amd64-98"
serverIpPbV4: "183.220.149.17"
serverIpInV4: ""
serverIpPbV6: ""
serverIpInV6: ""
location: "Chengdu Sichuan CN"
provider: "AS139080 The Internet Data Center of Sichuan Mobile Communication Company Limited"
managePort: "22"
cpuCore: "12 @ 4299.998 MHz"
cpuBrand: "Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz"
memoryTotal: "7.6 GB"
diskTotal: "914.9 GB"
diskUsage: "12.3 GB"
archInfo: "x86_64 (64 Bit)"
osInfo: "Ubuntu 20.04.5 LTS"
osKernelInfo: "5.4.0-135-generic"
tcpControl: "cubic"
virtualization: "Dedicated"
ioSpeed: "150 MB/s"
machineId: ""
agentVersion: ""
agentTopicName: ""

View File

@@ -18,3 +18,5 @@ tcpControl="cubic"
virtualization="Dedicated"
ioSpeed=" MB/s"
machineId=""
agentVersion=""
agentTopicName=""

View File

@@ -4,15 +4,12 @@ package io.wdd.agent.executor.web;
import io.wdd.agent.executor.CommandExecutor;
import io.wdd.agent.executor.FunctionExecutor;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.response.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;
@RestController
@RequestMapping("testExecutor")
@@ -25,15 +22,13 @@ public class TestCommandExecutorController {
FunctionExecutor functionExecutor;
/*@PostMapping("comand")
@PostMapping("comand")
public R<String> testFor(
@RequestParam(value = "streamKey") String streamKey,
@RequestParam(value = "command") List<String> command
){
commandExecutor.execute(streamKey, command);
@RequestBody OctopusMessage octopusMessage
) {
return R.ok(streamKey);
}*/
return R.ok("1");
}
@PostMapping("linuxFile")

View File

@@ -6,6 +6,7 @@ import io.wdd.agent.initialization.bootup.CollectSystemInfo;
import io.wdd.agent.initialization.bootup.OctopusAgentInitService;
import io.wdd.common.beans.response.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -23,7 +24,7 @@ public class SendServerInfoController {
OctopusAgentInitService octopusAgentInitService;
@PostMapping("sendAgentInfo")
public R<AgentServerInfo> send(){
public R<AgentServerInfo> send(@RequestBody AgentServerInfo info) {
AgentServerInfo agentServerInfo = collectSystemInfo.agentServerInfo;

View File

@@ -29,6 +29,11 @@ stream {
listen 21060;
proxy_pass 140.238.14.103:21060;
}
# nacos
server {
listen 22060;
proxy_pass 140.238.14.103:22060;
}
#rabbitmq

View File

@@ -68,6 +68,7 @@ services:
image: docker.io/nacos/nacos-server:v2.2.0-slim
ports:
- '21060:8848'
- '22060:9848'
environment:
- MODE=standalone
- MYSQL_SERVICE_HOST=mysql