Files
ProjectOctopus/agent-go/a_init/AgentInitialization.go
2024-04-29 17:30:15 +08:00

620 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package a_init
import (
"encoding/json"
"fmt"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
"math/rand"
"os"
"reflect"
"regexp"
"strconv"
"strings"
"time"
"wdd.io/agent-common/logger"
"wdd.io/agent-common/utils"
"wdd.io/agent-go/a_agent"
"wdd.io/agent-go/a_executor"
"wdd.io/agent-go/a_status"
"wdd.io/agent-go/g"
"wdd.io/agent-go/rabbitmq"
)
const AgentServerInfoLocalFilePath = "/usr/local/etc/octopus-agent/octopus-agent.conf"
var initOmType = g.InitOmType
var P = g.G.P
var log = logger.Log
func INIT(octopusAgentConfigFileName string) chan bool {
// 初始化变量
agentServerInfo := &a_agent.AgentServerInfo{}
// 初始化Agent的 RabbitMQ连接信息
agentConfig := parseOctopusAgentConf(octopusAgentConfigFileName)
a_agent.AgentConfig = agentConfig
// 使用a_status模块自身获取到运行的环境信息
agentInfo := a_status.ReportAgentInfo()
refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo)
// 如果从在本机文件,那么会使用手动写入的环境变量 进行覆盖
if utils.FileExistAndNotNull(AgentServerInfoLocalFilePath) {
// 获取系统的环境变量
agentServerInfoFromLocalFile := parseAgentServerInfo(AgentServerInfoLocalFilePath)
// 合并系统环境变量,手动输入的部分会覆盖自身获得内容
utils.CopySameFields(agentServerInfoFromLocalFile, agentServerInfo)
}
// build operator cache
buildAgentOsOperator(agentInfo, agentServerInfo)
// 缓存此内容
a_agent.AgentServerInfoCache = agentServerInfo
// build for octopus tcp connect info struct
rabbitTCPConnectInfo := buildOctopusTCPConnect(agentConfig)
initToServerQueue, initFromServerQueue := buildOctopusInitQueue(rabbitTCPConnectInfo)
defer initToServerQueue.Close()
defer initFromServerQueue.Close()
// 建立连接
initToServerQueue.Connect()
// 建立连接
initFromServerQueue.Connect()
buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue)
// rabbit queue handle message from server
initFromServerQueue.Handle()
// receive from server
for g.G.AgentHasRegister == false {
select {
case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan:
if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) {
log.InfoF("[INIT] - agent has registered !")
// 手动关闭 注册队列的连接
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
break
}
default:
log.Debug("agent init not received from server ! start to waiting !")
time.Sleep(time.Millisecond * 500)
}
}
// 建立 运行时 RabbitMQ连接
runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.AgentTopicName)
// 激活子模块
activatedOctopusAgentModules()
return runtimeConnectorQueue
}
func buildOctopusInitQueue(rabbitTCPConnectInfo *rabbitmq.RabbitTCPConnectInfo) (*rabbitmq.RabbitQueue, *rabbitmq.RabbitQueue) {
initToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_to_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_to_server_key"),
}
initFromServerProp := &rabbitmq.ConnectProperty{
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_from_server"),
ExchangeType: g.QueueDirect,
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_from_server_key"),
}
// 建立RabbitMQ的连接
initToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initToServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
initFromServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: initFromServerProp,
RabbitConnectInfo: rabbitTCPConnectInfo,
}
return initToServerQueue, initFromServerQueue
}
func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
// 建立 业务消息 接收队列
// agentTopicName为名称的队列
agentConfig := a_agent.AgentConfig
octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
octopusConnectProp := &rabbitmq.ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: agentTopicName,
ExchangeType: g.QueueTopic,
TopicKey: agentTopicName + "*",
}
octopusMsgQueue := &rabbitmq.RabbitQueue{
RabbitProp: octopusConnectProp,
}
octopusMsgQueue.Connect()
// 建立 业务消息 返回队列
// 统一为 OctopusToServer
octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
octopusToServerProp := &rabbitmq.ConnectProperty{
ExchangeName: octopusExchangeName,
QueueName: octopusToServerQueueName,
ExchangeType: g.QueueTopic,
TopicKey: octopusToServerQueueName,
}
octopusToServerQueue := &rabbitmq.RabbitQueue{
RabbitProp: octopusToServerProp,
}
// 开启运行时消息返回队列
octopusToServerQueue.Connect()
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue)
// 开始处理Runtime的OM消息
businessForeverChan := octopusMsgQueue.Handle()
// cache it
rabbitmq.BusinessRuntimeQueue = octopusMsgQueue
rabbitmq.OctopusToServerQueue = octopusToServerQueue
return businessForeverChan
}
// activatedOctopusAgentModules 激活Octopus Agent的所有子模块
func activatedOctopusAgentModules() {
// Agent
_ = P.Submit(func() {
a_agent.Activate()
})
// Executor
_ = P.Submit(func() {
a_executor.Activate()
})
// Status
_ = P.Submit(func() {
a_status.Activate()
})
}
func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) {
// 组装OctopusMessage
var octopusMsg *rabbitmq.OctopusMessage
octopusMsg = octopusMsg.Build(
initOmType,
agentServerInfo,
)
// 发送OM至MQ中
_ = P.Submit(
func() {
agentRegisterCount := 0
for g.G.AgentHasRegister == false {
if agentRegisterCount > g.AgentRegisterFailedCount {
log.ErrorF("Agent Register Failed after %d times tryings !", g.AgentRegisterFailedCount)
return
}
log.InfoF("[INIT] - Send init message to server! ==> %s", octopusMsg)
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
initToServerQueue.SendOMsg(octopusMsg)
agentRegisterCount += 1
// 休眠
time.Sleep(5 * time.Minute)
}
})
}
func buildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo {
host := agentConfig.GetString("spring.rabbitmq.host")
port := agentConfig.GetString("spring.rabbitmq.port")
username := agentConfig.GetString("spring.rabbitmq.username")
password := agentConfig.GetString("spring.rabbitmq.password")
virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
//todo
return &rabbitmq.RabbitTCPConnectInfo{
UserName: username,
Password: password,
Host: host,
Port: port,
VirtualHost: virtualHost,
}
}
func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
// cpu part
agentServerInfo.CPUCore = strconv.FormatInt(int64(agentInfo.CPUInfo.NumCores), 10)
if len(agentInfo.CPUInfo.CPUInfo) > 0 {
agentServerInfo.CPUBrand = agentInfo.CPUInfo.CPUInfo[0].ModelName
}
// os info
agentServerInfo.OSInfo = agentInfo.HostInfo.PlatformFamily + " " + agentInfo.HostInfo.Platform + " " + agentInfo.HostInfo.PlatformVersion
agentServerInfo.OSKernelInfo = agentInfo.HostInfo.KernelVersion
agentServerInfo.Virtualization = agentInfo.HostInfo.VirtualizationSystem + " " + agentInfo.HostInfo.VirtualizationRole
agentServerInfo.Platform = agentInfo.HostInfo.Platform
agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily
agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion
agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion
agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch
// memory part
agentServerInfo.MemoryTotal = utils.ByteSizeToString(agentInfo.MemoryInfo.TotalMemory)
agentServerInfo.SwapTotal = utils.ByteSizeToString(agentInfo.MemoryInfo.SwapTotal)
// disk part
info := agentInfo.DiskInfo
diskTotal := uint64(0)
diskUsage := uint64(0)
for _, diskInfo := range info {
if diskInfo.Total > diskTotal {
diskTotal = diskInfo.Total
diskUsage = diskInfo.Used
}
}
agentServerInfo.DiskTotal = utils.ByteSizeToString(diskTotal)
agentServerInfo.DiskUsage = utils.ByteSizeToString(diskUsage)
// network part
refreshAgentNetworkInfo(agentInfo, agentServerInfo)
// host info
agentServerInfo.ServerName = uniformAgentServerName(agentInfo, agentServerInfo)
agentServerInfo.MachineID = agentInfo.HostInfo.HostID
// io test
agentServerInfo.IoSpeed = testDiskIO()
log.DebugF("[refreshAgentInfoByStatusInfo] - ok !")
}
func uniformAgentServerName(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) string {
hostname := agentInfo.HostInfo.Hostname
// Shanghai-amd64-01
if strings.Count(hostname, "-") == 2 {
split := strings.Split(hostname, "-")
if split[1] == getMachineType(agentInfo.HostInfo.KernelArch) {
// 第二位是 amd64 arm64
return hostname
}
}
// 不是标准的AgentName格式
city := agentServerInfo.City
city = strings.Title(city)
if strings.Contains(city, " ") {
city = strings.Join(strings.Split(city, " "), "")
}
// uniform city format
agentServerInfo.City = city
// linux host architecture
arch := getMachineType(agentInfo.HostInfo.KernelArch)
agentServerInfo.CPUArch = arch
var numS string
if agentServerInfo.ServerIPInV4 != "" {
split := strings.Split(agentServerInfo.ServerIPInV4, ".")
numS = split[3]
} else {
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
// Generate a random number between 1 and 999
num := rand.Intn(999) + 1
// Format the number as a string with leading zeros
numS = fmt.Sprintf("%03d", num)
}
return city + "-" + arch + "-" + numS
}
func testDiskIO() string {
log.InfoF("testDiskIO - start !")
// Create a temporary file to test disk I/O
f, err := os.CreateTemp("", "test_disk_io")
if err != nil {
fmt.Println(err)
return ""
}
defer func() {
f.Close()
os.Remove(f.Name())
}()
// Write data to the file
data := make([]byte, 10240*10240) // 10MB
for i := 0; i < 10; i++ {
_, err = f.Write(data)
if err != nil {
fmt.Println(err)
return ""
}
}
// Read data from the file
startTime := time.Now()
buf := make([]byte, 10240*10240) // 10MB
for {
n, err := f.Read(buf)
if err != nil {
break
}
if n == 0 {
break
}
}
elapsedTime := time.Since(startTime).Seconds()
// Calculate the disk I/O speed in MB/s
speed := float64(len(data)) / (elapsedTime * 10240 * 10240)
sprintf := fmt.Sprintf("%.2f MB/s", speed)
log.InfoF("testDiskIO - end io speed are => %s", sprintf)
return sprintf
}
func getMachineType(arch string) string {
switch {
case strings.HasSuffix(arch, "386") || arch == "i386":
return "ia32"
case strings.HasSuffix(arch, "amd64") || arch == "x86_64":
return "amd64"
case arch == "armv5tel":
return "arm32"
case arch == "armv6l":
return "arm32"
case arch == "armv7" || arch == "armv7l":
return "arm32"
case arch == "armv8" || arch == "aarch64":
return "arm64"
case arch == "mips":
return "mips32"
case arch == "mipsle":
return "mips32le"
case arch == "mips64":
return "mips64"
case arch == "mips64le":
return "mips64le"
case arch == "ppc64":
return "ppc64"
case arch == "ppc64le":
return "ppc64le"
case arch == "riscv64":
return "riscv64"
case arch == "s390x":
return "s390x"
default:
fmt.Println("error: The architecture is not supported.")
return ""
}
}
func refreshAgentNetworkInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
// 获取Agent的公网服务信息
publicNetworkInfo := a_status.PublicNetworkInfo{}
publicNetworkInfo.GetPublicNetworkInfo()
marshal, _ := json.Marshal(publicNetworkInfo)
log.InfoF("refreshAgentNetworkInfo - public network info is %s", marshal)
if publicNetworkInfo.IP != "" {
agentServerInfo.ServerIPPbV4 = publicNetworkInfo.IP
agentServerInfo.Province = publicNetworkInfo.Region
agentServerInfo.City = publicNetworkInfo.City
agentServerInfo.Country = publicNetworkInfo.Country
agentServerInfo.Location = publicNetworkInfo.Loc
agentServerInfo.Organization = publicNetworkInfo.Org
agentServerInfo.TimeZone = publicNetworkInfo.Timezone
}
// inner ip v4 v6
for _, networkInfo := range agentInfo.NetworkInfo {
if isNetworkInterface(networkInfo.Name) {
log.InfoF("refreshAgentNetworkInfo - network interface is %v", networkInfo)
if networkInfo.InternalIPv4 == nil || len(networkInfo.InternalIPv4) == 0 {
continue
}
// 通配到对应的网卡
s := networkInfo.InternalIPv4[0]
if strings.Contains(s, "/") {
s = strings.Split(s, "/")[0]
}
agentServerInfo.ServerIPInV4 = s
if networkInfo.InternalIPv6 == nil || len(networkInfo.InternalIPv6) == 0 {
continue
}
s2 := networkInfo.InternalIPv6[0]
if strings.Contains(s2, "/") {
s2 = strings.Split(s2, "/")[0]
}
agentServerInfo.ServerIPInV6 = s2
if agentServerInfo.ServerIPInV4 != "" {
break
}
}
}
}
// isNetworkInterface 检查网卡名称是否符合给定的模式ens, eno或enp开头或者像enp1s2这样的格式
func isNetworkInterface(networkInterface string) bool {
// 正则表达式匹配 ens 或 eno 开头的字符串
// 或者以 enp 开头后面跟着至少一个数字一个连字符再是一个数字最后可以有一个可选的连字符和数字组合如enp0s3或enp1s2
pattern := `^(ens|eno|eth|enp)[0-9]+$|enp+[0-9]s+[0-9]$`
re := regexp.MustCompile(pattern)
return re.MatchString(networkInterface)
}
// handleInitMsgFromServer 处理从Server接收的 注册信息
func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool {
log.DebugF("message received from server is %s", initFromServerMsg)
var serverInfo a_agent.AgentServerInfo
s, er := initFromServerMsg.Content.(string)
if !er {
log.ErrorF("convert to string error! => %v", er)
}
cc := json.Unmarshal([]byte(s), &serverInfo)
if cc != nil {
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
}
serverName := serverInfo.ServerName
// 处理OM信息
if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) {
// 是本机的注册回复信息
log.InfoF("OctopusMessage INIT from server is this agent !")
// 修改系统参数
g.G.AgentHasRegister = true
// 保存真实的AgentTopicName
a_agent.AgentServerInfoCache.AgentTopicName = serverInfo.AgentTopicName
} else {
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
log.WarnF("OctopusMessage INIT from server not this agent ! => %v ", &initFromServerMsg)
// 需要休眠等待不再获取相应的信息
_ = P.Submit(func() {
// 生成一个1到5之间的随机整数
seconds := rand.Intn(5) + 1
initToServerQueue.SendOMsg(initFromServerMsg)
// 谦让型 多并发模型 等待其他Agent注册成功
time.Sleep(time.Duration(seconds) * time.Second)
})
}
return g.G.AgentHasRegister
}
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
initFromServerQueue.ConsumeOK.Store(false)
initToServerQueue.ConsumeOK.Store(false)
log.InfoF("Octopus Agent Init Queue has disconnected!")
}
func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo {
// 约定文件地址为 /octopus-agent/octopus-agent.conf
var agentServerInfo *a_agent.AgentServerInfo
yamlFile, err := os.ReadFile(agentServerInfoConf)
if err != nil {
panic(fmt.Errorf("failed to read YAML file: %v", err))
}
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
if err != nil {
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
}
// uniform agent server info
UniformAgentServerInfo(agentServerInfo)
jsonFormat, err := json.Marshal(&agentServerInfo)
if err != nil {
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
panic(err)
}
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
return agentServerInfo
}
// UniformAgentServerInfo uniform deal with ip
func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) {
// reflect to iterator all field
log.Info("[Initialization] - UniformAgentServerInfo !")
value := reflect.ValueOf(agentServerInfo).Elem()
for i := 0; i < value.NumField(); i++ {
field := value.Field(i)
if field.Kind() == reflect.String && field.CanSet() {
field.SetString(strings.TrimSpace(field.String()))
}
}
log.Debug("[Initialization] - uniform ip address !")
if strings.Contains(agentServerInfo.ServerIPInV4, "/") {
agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0]
}
if strings.Contains(agentServerInfo.ServerIPInV6, "/") {
agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0]
}
}
func buildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
// 2023年8月4日 pass through some key information
ossOfflinePrefix := "http://bastion.io"
if a_agent.AgentConfig != nil {
ossOfflinePrefix = a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix")
if !strings.HasSuffix(ossOfflinePrefix, "/") {
ossOfflinePrefix += "/"
}
} else {
log.WarnF("buildAgentOsOperator - agent oss offline prefix is null !")
}
// call the init exec function
agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix)
// assign the agentServerInfo
agentOsOperator.AgentServerInfo = agentServerInfo
// debug
marshal, _ := json.Marshal(agentOsOperator)
log.DebugF("[Agent INIT] cached agent operator is %s", marshal)
}