[agent-go] 优化代码,完成连通性测试
This commit is contained in:
@@ -20,7 +20,7 @@ const (
|
|||||||
InitOmType = "INIT"
|
InitOmType = "INIT"
|
||||||
)
|
)
|
||||||
|
|
||||||
var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger2.Log))
|
var pool, _ = ants.NewPool(100, ants.WithNonblocking(false), ants.WithLogger(logger2.Log), ants.WithMaxBlockingTasks(30), ants.WithDisablePurge(true))
|
||||||
|
|
||||||
var G = NewGlobal(
|
var G = NewGlobal(
|
||||||
pool,
|
pool,
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ spring:
|
|||||||
config-retry-time: 3000
|
config-retry-time: 3000
|
||||||
file-extension: yaml
|
file-extension: yaml
|
||||||
max-retry: 3
|
max-retry: 3
|
||||||
server-addr: "150.230.198.103:21060"
|
# server-addr: "150.230.198.103:21060"
|
||||||
# server-addr: "42.192.52.227:21060"
|
server-addr: "42.192.52.227:21060"
|
||||||
timeout: 5000
|
timeout: 5000
|
||||||
config-long-poll-timeout: 5000
|
config-long-poll-timeout: 5000
|
||||||
extension-configs:
|
extension-configs:
|
||||||
|
|||||||
@@ -28,26 +28,6 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
|
|||||||
}
|
}
|
||||||
octopusMsgQueue.Connect()
|
octopusMsgQueue.Connect()
|
||||||
|
|
||||||
deliveries := octopusMsgQueue.Read(true)
|
|
||||||
|
|
||||||
// 死循环,处理Octopus Message
|
|
||||||
P.Submit(
|
|
||||||
func() {
|
|
||||||
for delivery := range deliveries {
|
|
||||||
|
|
||||||
var om *OctopusMessage
|
|
||||||
err := json.Unmarshal(delivery.Body, &om)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
|
|
||||||
// 保存到某处
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// 策略模式 处理消息
|
|
||||||
om.Handle()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// 建立 业务消息 返回队列
|
// 建立 业务消息 返回队列
|
||||||
// 统一为 OctopusToServer
|
// 统一为 OctopusToServer
|
||||||
|
|
||||||
@@ -67,6 +47,29 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) {
|
|||||||
// 开启运行时消息返回队列
|
// 开启运行时消息返回队列
|
||||||
OctopusToServerQueue.Connect()
|
OctopusToServerQueue.Connect()
|
||||||
|
|
||||||
log.InfoF("Octopus Message Replay Queue is established ! => %v", OctopusToServerQueue)
|
log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue)
|
||||||
|
|
||||||
|
deliveries := octopusMsgQueue.Read(true)
|
||||||
|
forever := make(chan bool)
|
||||||
|
P.Submit(
|
||||||
|
func() {
|
||||||
|
// 死循环,处理Octopus Message
|
||||||
|
for delivery := range deliveries {
|
||||||
|
|
||||||
|
var om *OctopusMessage
|
||||||
|
err := json.Unmarshal(delivery.Body, &om)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body))
|
||||||
|
// 保存到某处
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 策略模式 处理消息
|
||||||
|
om.Handle()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait forever
|
||||||
|
<-forever
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,18 +57,18 @@ func INIT() *AgentServerInfo {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
|
log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg))
|
||||||
}
|
}
|
||||||
log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes)))
|
|
||||||
|
|
||||||
// 发送OM至MQ中
|
// 发送OM至MQ中
|
||||||
P.Submit(
|
P.Submit(
|
||||||
func() {
|
func() {
|
||||||
for g.G.AgentHasRegister == false {
|
for g.G.AgentHasRegister == false {
|
||||||
|
|
||||||
|
log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes)))
|
||||||
|
|
||||||
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
|
//如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息
|
||||||
initToServerQueue.Send(
|
initToServerQueue.Send(
|
||||||
msgBytes,
|
msgBytes,
|
||||||
)
|
)
|
||||||
|
|
||||||
// 休眠
|
// 休眠
|
||||||
time.Sleep(10 * time.Minute)
|
time.Sleep(10 * time.Minute)
|
||||||
|
|
||||||
@@ -96,66 +96,57 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe
|
|||||||
|
|
||||||
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
|
initOctopusMessageDeliveries := initFromServerQueue.Read(false)
|
||||||
|
|
||||||
forever := make(chan bool)
|
// 同步很多抢占注册的情况
|
||||||
|
for delivery := range initOctopusMessageDeliveries {
|
||||||
|
|
||||||
// use the ant goroutine pool
|
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
|
||||||
P.Submit(
|
|
||||||
func() {
|
|
||||||
|
|
||||||
// 同步很多抢占注册的情况
|
var initOctopusMsg *rabbitmq.OctopusMessage
|
||||||
for delivery := range initOctopusMessageDeliveries {
|
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
|
||||||
|
string(delivery.Body)))
|
||||||
|
}
|
||||||
|
|
||||||
log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body)))
|
var serverInfo AgentServerInfo
|
||||||
|
|
||||||
var initOctopusMsg *rabbitmq.OctopusMessage
|
s, _ := initOctopusMsg.Content.(string)
|
||||||
err := json.Unmarshal(delivery.Body, &initOctopusMsg)
|
cc := json.Unmarshal([]byte(s), &serverInfo)
|
||||||
if err != nil {
|
if cc != nil {
|
||||||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ",
|
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
|
||||||
string(delivery.Body)))
|
}
|
||||||
}
|
serverName := serverInfo.ServerName
|
||||||
|
|
||||||
var serverInfo AgentServerInfo
|
// 处理OM信息
|
||||||
|
if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
|
||||||
|
// 是本机的注册回复信息
|
||||||
|
log.InfoF("OctopusMessage INIT from server is this agent !")
|
||||||
|
|
||||||
s, _ := initOctopusMsg.Content.(string)
|
// 手动确认信息
|
||||||
cc := json.Unmarshal([]byte(s), &serverInfo)
|
delivery.Ack(false)
|
||||||
if cc != nil {
|
|
||||||
log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc))
|
|
||||||
}
|
|
||||||
serverName := serverInfo.ServerName
|
|
||||||
|
|
||||||
// 处理OM信息
|
// 修改系统参数
|
||||||
if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName {
|
g.G.AgentHasRegister = true
|
||||||
// 是本机的注册回复信息
|
|
||||||
|
|
||||||
// 建立 运行时 RabbitMQ连接
|
// 建立 运行时 RabbitMQ连接
|
||||||
agentTopicName := initOctopusMsg.Result.(string)
|
agentTopicName := initOctopusMsg.Result.(string)
|
||||||
rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName)
|
rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName)
|
||||||
|
|
||||||
// 手动确认信息
|
// 手动关闭 注册队列的连接
|
||||||
delivery.Ack(false)
|
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 手动关闭 注册队列的连接
|
// 不是自身的 注册回复信息 -- 拒绝
|
||||||
shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue)
|
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body))
|
||||||
|
delivery.Nack(false, true)
|
||||||
return
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// 不是自身的 注册回复信息 -- 拒绝
|
|
||||||
log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body))
|
|
||||||
delivery.Nack(false, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
// wait forever
|
|
||||||
<-forever
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
|
// shutdownRegisterQueueConnection 关闭初始化连接的两个队列
|
||||||
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
|
func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) {
|
||||||
|
log.InfoF("Shutdown register queue connection !")
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseAgentServerInfo() *AgentServerInfo {
|
func parseAgentServerInfo() *AgentServerInfo {
|
||||||
|
|||||||
BIN
agent-go/tmp/nacos_config_export_20230330143045.zip
Normal file
BIN
agent-go/tmp/nacos_config_export_20230330143045.zip
Normal file
Binary file not shown.
@@ -78,8 +78,6 @@ services:
|
|||||||
- MYSQL_SERVICE_PASSWORD=Superwmm.23
|
- MYSQL_SERVICE_PASSWORD=Superwmm.23
|
||||||
depends_on:
|
depends_on:
|
||||||
- mysql
|
- mysql
|
||||||
volumes:
|
|
||||||
- 'rabbitmq_data:/bitnami/rabbitmq/mnesia'
|
|
||||||
|
|
||||||
# redis-replica:
|
# redis-replica:
|
||||||
# image: redis-image
|
# image: redis-image
|
||||||
|
|||||||
Reference in New Issue
Block a user