432 lines
13 KiB
Go
432 lines
13 KiB
Go
package a_init
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/spf13/viper"
|
||
"gopkg.in/yaml.v3"
|
||
"io/ioutil"
|
||
"math/rand"
|
||
"reflect"
|
||
"regexp"
|
||
"strings"
|
||
"time"
|
||
"wdd.io/agent-common/logger"
|
||
"wdd.io/agent-go/a_agent"
|
||
"wdd.io/agent-go/a_executor"
|
||
"wdd.io/agent-go/a_status"
|
||
"wdd.io/agent-go/g"
|
||
"wdd.io/agent-go/rabbitmq"
|
||
)
|
||
|
||
var initOmType = g.InitOmType
|
||
var P = g.G.P
|
||
|
||
var log = logger.Log
|
||
|
||
func INIT(octopusAgentConfigFileName string, agentServerInfoConf string) chan bool {
|
||
|
||
// 获取系统的环境变量
|
||
agentServerInfo := parseAgentServerInfo(agentServerInfoConf)
|
||
// todo totally get from a_status module
|
||
|
||
// 初始化Nacos的连接配置
|
||
agentConfig := parseOctopusAgentConf(octopusAgentConfigFileName)
|
||
a_agent.AgentConfig = agentConfig
|
||
|
||
// re-get agentInfo from status module
|
||
agentInfo := a_status.ReportAgentInfo()
|
||
refreshAgentInfoByStatusInfo(agentInfo, agentServerInfo)
|
||
|
||
// build operator cache
|
||
buildAgentOsOperator(agentInfo, agentServerInfo)
|
||
|
||
// 缓存此内容
|
||
a_agent.AgentServerInfoCache = agentServerInfo
|
||
|
||
// build for octopus tcp connect info struct
|
||
rabbitTCPConnectInfo := buildOctopusTCPConnect(agentConfig)
|
||
|
||
initToServerQueue, initFromServerQueue := buildOctopusInitQueue(rabbitTCPConnectInfo)
|
||
defer initToServerQueue.Close()
|
||
defer initFromServerQueue.Close()
|
||
|
||
// 建立连接
|
||
initToServerQueue.Connect()
|
||
|
||
// 建立连接
|
||
initFromServerQueue.Connect()
|
||
|
||
buildAndSendInitMsgToServer(agentServerInfo, initToServerQueue)
|
||
|
||
// rabbit queue handle message from server
|
||
initFromServerQueue.Handle()
|
||
|
||
// receive from server
|
||
for g.G.AgentHasRegister == false {
|
||
select {
|
||
case initFromServerMsg := <-initFromServerQueue.ReceiveChan.InitRChan:
|
||
if handleInitMsgFromServer(initFromServerMsg, initToServerQueue, agentServerInfo) {
|
||
log.InfoF("[INIT] - agent has registered !")
|
||
// 手动关闭 注册队列的连接
|
||
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
|
||
break
|
||
}
|
||
default:
|
||
log.Debug("agent init not received from server ! start to waiting !")
|
||
time.Sleep(time.Millisecond * 500)
|
||
}
|
||
}
|
||
|
||
// 建立 运行时 RabbitMQ连接
|
||
runtimeConnectorQueue := buildAndStartBusinessRuntimeQueue(a_agent.AgentServerInfoCache.TopicName)
|
||
|
||
// 激活子模块
|
||
activatedOctopusAgentModules()
|
||
|
||
return runtimeConnectorQueue
|
||
}
|
||
|
||
func buildOctopusInitQueue(rabbitTCPConnectInfo *rabbitmq.RabbitTCPConnectInfo) (*rabbitmq.RabbitQueue, *rabbitmq.RabbitQueue) {
|
||
|
||
initToServerProp := &rabbitmq.ConnectProperty{
|
||
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
|
||
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_to_server"),
|
||
ExchangeType: g.QueueDirect,
|
||
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_to_server_key"),
|
||
}
|
||
|
||
initFromServerProp := &rabbitmq.ConnectProperty{
|
||
ExchangeName: a_agent.AgentConfig.GetString("octopus.message.init_exchange"),
|
||
QueueName: a_agent.AgentConfig.GetString("octopus.message.init_from_server"),
|
||
ExchangeType: g.QueueDirect,
|
||
TopicKey: a_agent.AgentConfig.GetString("octopus.message.init_from_server_key"),
|
||
}
|
||
|
||
// 建立RabbitMQ的连接
|
||
initToServerQueue := &rabbitmq.RabbitQueue{
|
||
RabbitProp: initToServerProp,
|
||
RabbitConnectInfo: rabbitTCPConnectInfo,
|
||
}
|
||
|
||
initFromServerQueue := &rabbitmq.RabbitQueue{
|
||
RabbitProp: initFromServerProp,
|
||
RabbitConnectInfo: rabbitTCPConnectInfo,
|
||
}
|
||
|
||
return initToServerQueue, initFromServerQueue
|
||
}
|
||
|
||
func buildAndStartBusinessRuntimeQueue(agentTopicName string) chan bool {
|
||
|
||
// 建立 业务消息 接收队列
|
||
// agentTopicName为名称的队列
|
||
agentConfig := a_agent.AgentConfig
|
||
|
||
octopusExchangeName := agentConfig.GetString("octopus.message.octopus_exchange")
|
||
|
||
octopusConnectProp := &rabbitmq.ConnectProperty{
|
||
ExchangeName: octopusExchangeName,
|
||
QueueName: agentTopicName,
|
||
ExchangeType: g.QueueTopic,
|
||
TopicKey: agentTopicName + "*",
|
||
}
|
||
|
||
octopusMsgQueue := &rabbitmq.RabbitQueue{
|
||
RabbitProp: octopusConnectProp,
|
||
}
|
||
octopusMsgQueue.Connect()
|
||
|
||
// 建立 业务消息 返回队列
|
||
// 统一为 OctopusToServer
|
||
octopusToServerQueueName := agentConfig.GetString("octopus.message.octopus_to_server")
|
||
|
||
octopusToServerProp := &rabbitmq.ConnectProperty{
|
||
ExchangeName: octopusExchangeName,
|
||
QueueName: octopusToServerQueueName,
|
||
ExchangeType: g.QueueTopic,
|
||
TopicKey: octopusToServerQueueName,
|
||
}
|
||
|
||
octopusToServerQueue := &rabbitmq.RabbitQueue{
|
||
RabbitProp: octopusToServerProp,
|
||
}
|
||
|
||
// 开启运行时消息返回队列
|
||
octopusToServerQueue.Connect()
|
||
|
||
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", octopusToServerQueue)
|
||
|
||
// 开始处理Runtime的OM消息
|
||
businessForeverChan := octopusMsgQueue.Handle()
|
||
|
||
// cache it
|
||
rabbitmq.BusinessRuntimeQueue = octopusMsgQueue
|
||
rabbitmq.OctopusToServerQueue = octopusToServerQueue
|
||
|
||
return businessForeverChan
|
||
}
|
||
|
||
// activatedOctopusAgentModules 激活Octopus Agent的所有子模块
|
||
func activatedOctopusAgentModules() {
|
||
|
||
// Agent
|
||
_ = P.Submit(func() {
|
||
a_agent.Activate()
|
||
})
|
||
// Executor
|
||
_ = P.Submit(func() {
|
||
a_executor.Activate()
|
||
})
|
||
// Status
|
||
_ = P.Submit(func() {
|
||
a_status.Activate()
|
||
})
|
||
}
|
||
|
||
func buildAndSendInitMsgToServer(agentServerInfo *a_agent.AgentServerInfo, initToServerQueue *rabbitmq.RabbitQueue) {
|
||
// 组装OctopusMessage
|
||
var octopusMsg *rabbitmq.OctopusMessage
|
||
octopusMsg = octopusMsg.Build(
|
||
initOmType,
|
||
agentServerInfo,
|
||
)
|
||
|
||
// 发送OM至MQ中
|
||
_ = P.Submit(
|
||
func() {
|
||
agentRegisterCount := 0
|
||
for g.G.AgentHasRegister == false {
|
||
if agentRegisterCount > g.AgentRegisterFailedCount {
|
||
log.ErrorF("Agent Register Failed after %d times tryings !", g.AgentRegisterFailedCount)
|
||
return
|
||
}
|
||
|
||
log.InfoF("[INIT] - Send init message to server! ==> %s", octopusMsg)
|
||
|
||
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
|
||
initToServerQueue.SendOMsg(octopusMsg)
|
||
agentRegisterCount += 1
|
||
|
||
// 休眠
|
||
time.Sleep(5 * time.Minute)
|
||
}
|
||
})
|
||
}
|
||
|
||
func buildOctopusTCPConnect(agentConfig *viper.Viper) *rabbitmq.RabbitTCPConnectInfo {
|
||
|
||
host := agentConfig.GetString("spring.rabbitmq.host")
|
||
port := agentConfig.GetString("spring.rabbitmq.port")
|
||
username := agentConfig.GetString("spring.rabbitmq.username")
|
||
password := agentConfig.GetString("spring.rabbitmq.password")
|
||
virtualHost := agentConfig.GetString("spring.rabbitmq.virtual-host")
|
||
|
||
//todo
|
||
return &rabbitmq.RabbitTCPConnectInfo{
|
||
UserName: username,
|
||
Password: password,
|
||
Host: host,
|
||
Port: port,
|
||
VirtualHost: virtualHost,
|
||
}
|
||
}
|
||
|
||
func refreshAgentInfoByStatusInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
|
||
|
||
// host info
|
||
agentServerInfo.Platform = agentInfo.HostInfo.Platform
|
||
agentServerInfo.PlatformFamily = agentInfo.HostInfo.PlatformFamily
|
||
agentServerInfo.PlatformVersion = agentInfo.HostInfo.PlatformVersion
|
||
agentServerInfo.KernelVersion = agentInfo.HostInfo.KernelVersion
|
||
agentServerInfo.KernelArch = agentInfo.HostInfo.KernelArch
|
||
|
||
// network part
|
||
refreshAgentNetworkInfo(agentInfo, agentServerInfo)
|
||
|
||
log.DebugF("[refreshAgentInfoByStatusInfo] - ok !")
|
||
}
|
||
|
||
func refreshAgentNetworkInfo(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
|
||
|
||
// 测试网卡名称
|
||
//testCases := []string{"ens33", "eno1", "enp0s3", "enp1s2", "eth0", "enp2s5", "enx1234567890ab", "ens1234567890ab", "enp1234567890ab", "enp1234567890ab", "enp1", "lo","","docker0", "virbr0", "veth0",}
|
||
//for _, tc := range testCases {
|
||
// fmt.Printf("Network interface '%s' is %s\n", tc, fmt.Sprintf("%v", isNetworkInterface(tc)))
|
||
//}
|
||
|
||
// inner ip v4 v6
|
||
for _, networkInfo := range agentInfo.NetworkInfo {
|
||
if isNetworkInterface(networkInfo.Name) {
|
||
log.InfoF("refreshAgentNetworkInfo - network interface is %v", networkInfo)
|
||
if networkInfo.InternalIPv4 == nil || len(networkInfo.InternalIPv4) == 0 {
|
||
continue
|
||
}
|
||
// 通配到对应的网卡
|
||
s := networkInfo.InternalIPv4[0]
|
||
if strings.Contains(s, "/") {
|
||
s = strings.Split(s, "/")[0]
|
||
}
|
||
agentServerInfo.ServerIPInV4 = s
|
||
|
||
if networkInfo.InternalIPv6 == nil || len(networkInfo.InternalIPv6) == 0 {
|
||
continue
|
||
}
|
||
s2 := networkInfo.InternalIPv6[0]
|
||
if strings.Contains(s2, "/") {
|
||
s2 = strings.Split(s2, "/")[0]
|
||
}
|
||
agentServerInfo.ServerIPInV6 = s2
|
||
|
||
if agentServerInfo.ServerIPInV4 != "" {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
// isNetworkInterface 检查网卡名称是否符合给定的模式(ens, eno或enp开头,或者像enp1s2这样的格式)。
|
||
func isNetworkInterface(networkInterface string) bool {
|
||
// 正则表达式匹配 ens 或 eno 开头的字符串
|
||
// 或者以 enp 开头,后面跟着至少一个数字,一个连字符,再是一个数字,最后可以有一个可选的连字符和数字组合(如enp0s3或enp1s2)
|
||
pattern := `^(ens|eno|eth|enp)[0-9]+$|enp+[0-9]s+[0-9]$`
|
||
re := regexp.MustCompile(pattern)
|
||
return re.MatchString(networkInterface)
|
||
}
|
||
|
||
// handleInitMsgFromServer 处理从Server接收的 注册信息
|
||
func handleInitMsgFromServer(initFromServerMsg *rabbitmq.OctopusMessage, initToServerQueue *rabbitmq.RabbitQueue, agentServerInfo *a_agent.AgentServerInfo) bool {
|
||
|
||
log.DebugF("message received from server is %s", initFromServerMsg)
|
||
|
||
var serverInfo a_agent.AgentServerInfo
|
||
|
||
s, er := initFromServerMsg.Content.(string)
|
||
if !er {
|
||
log.ErrorF("convert to string error! => %v", er)
|
||
}
|
||
cc := json.Unmarshal([]byte(s), &serverInfo)
|
||
if cc != nil {
|
||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
|
||
}
|
||
serverName := serverInfo.ServerName
|
||
|
||
// 处理OM信息
|
||
if strings.HasPrefix(initFromServerMsg.OctopusMessageType, g.InitOmType) && strings.HasPrefix(serverName, agentServerInfo.ServerName) {
|
||
// 是本机的注册回复信息
|
||
log.InfoF("OctopusMessage INIT from server is this agent !")
|
||
|
||
// 修改系统参数
|
||
g.G.AgentHasRegister = true
|
||
|
||
// 保存真实的AgentTopicName
|
||
a_agent.AgentServerInfoCache.TopicName = serverInfo.TopicName
|
||
|
||
} else {
|
||
// 不是自身的 注册回复信息 -- 拒绝 2023年6月19日 此处存在错误! 会死循环Nack 导致异常
|
||
log.WarnF("OctopusMessage INIT from server not this agent ! => %v ", &initFromServerMsg)
|
||
|
||
// 需要休眠等待不再获取相应的信息
|
||
_ = P.Submit(func() {
|
||
|
||
// 生成一个1到5之间的随机整数
|
||
seconds := rand.Intn(5) + 1
|
||
|
||
initToServerQueue.SendOMsg(initFromServerMsg)
|
||
|
||
// 谦让型 多并发模型 等待其他Agent注册成功
|
||
time.Sleep(time.Duration(seconds) * time.Second)
|
||
})
|
||
|
||
}
|
||
|
||
return g.G.AgentHasRegister
|
||
}
|
||
|
||
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
|
||
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
|
||
|
||
initFromServerQueue.ConsumeOK.Store(false)
|
||
initToServerQueue.ConsumeOK.Store(false)
|
||
|
||
log.InfoF("Octopus Agent Init Queue has disconnected!")
|
||
}
|
||
|
||
func parseAgentServerInfo(agentServerInfoConf string) *a_agent.AgentServerInfo {
|
||
|
||
// 约定文件地址为 /octopus-agent/octopus-agent.conf
|
||
var agentServerInfo *a_agent.AgentServerInfo
|
||
yamlFile, err := ioutil.ReadFile(agentServerInfoConf)
|
||
|
||
if err != nil {
|
||
panic(fmt.Errorf("failed to read YAML file: %v", err))
|
||
}
|
||
|
||
err = yaml.Unmarshal(yamlFile, &agentServerInfo)
|
||
if err != nil {
|
||
panic(fmt.Errorf("failed to unmarshal YAML: %v", err))
|
||
}
|
||
|
||
// uniform agent server info
|
||
UniformAgentServerInfo(agentServerInfo)
|
||
|
||
jsonFormat, err := json.Marshal(&agentServerInfo)
|
||
if err != nil {
|
||
log.Error(fmt.Sprintf("agent server info convert error ! agentserverinfo is %v", agentServerInfo))
|
||
panic(err)
|
||
}
|
||
log.Info(fmt.Sprintf("agent server info is %v", string(jsonFormat)))
|
||
|
||
return agentServerInfo
|
||
}
|
||
|
||
// UniformAgentServerInfo uniform deal with ip
|
||
func UniformAgentServerInfo(agentServerInfo *a_agent.AgentServerInfo) {
|
||
|
||
// reflect to iterator all field
|
||
log.Info("[Initialization] - UniformAgentServerInfo !")
|
||
value := reflect.ValueOf(agentServerInfo).Elem()
|
||
|
||
for i := 0; i < value.NumField(); i++ {
|
||
field := value.Field(i)
|
||
if field.Kind() == reflect.String && field.CanSet() {
|
||
field.SetString(strings.TrimSpace(field.String()))
|
||
}
|
||
}
|
||
|
||
log.Debug("[Initialization] - uniform ip address !")
|
||
if strings.Contains(agentServerInfo.ServerIPInV4, "/") {
|
||
agentServerInfo.ServerIPInV4 = strings.Split(agentServerInfo.ServerIPInV4, "/")[0]
|
||
}
|
||
|
||
if strings.Contains(agentServerInfo.ServerIPInV6, "/") {
|
||
agentServerInfo.ServerIPInV6 = strings.Split(agentServerInfo.ServerIPInV6, "/")[0]
|
||
}
|
||
|
||
}
|
||
|
||
func buildAgentOsOperator(agentInfo *a_status.AgentInfo, agentServerInfo *a_agent.AgentServerInfo) {
|
||
|
||
// 2023年8月4日 pass through some key information
|
||
ossOfflinePrefix := "http://bastion.io"
|
||
if a_agent.AgentConfig != nil {
|
||
ossOfflinePrefix = a_agent.AgentConfig.GetString("octopus.agent.executor.ossOfflinePrefix")
|
||
if !strings.HasSuffix(ossOfflinePrefix, "/") {
|
||
ossOfflinePrefix += "/"
|
||
}
|
||
} else {
|
||
log.WarnF("buildAgentOsOperator - agent oss offline prefix is null !")
|
||
}
|
||
|
||
// call the init exec function
|
||
agentOsOperator := a_executor.BuildAgentOsOperator(agentInfo, ossOfflinePrefix)
|
||
|
||
// assign the agentServerInfo
|
||
agentOsOperator.AgentServerInfo = agentServerInfo
|
||
|
||
// debug
|
||
marshal, _ := json.Marshal(agentOsOperator)
|
||
log.DebugF("[Agent INIT] cached agent operator is %s", marshal)
|
||
}
|