diff --git a/agent-go/g/global.go b/agent-go/g/global.go index 4cf4360..c5f7247 100644 --- a/agent-go/g/global.go +++ b/agent-go/g/global.go @@ -20,7 +20,7 @@ const ( InitOmType = "INIT" ) -var pool, _ = ants.NewPool(100, ants.WithNonblocking(true), ants.WithLogger(logger2.Log)) +var pool, _ = ants.NewPool(100, ants.WithNonblocking(false), ants.WithLogger(logger2.Log), ants.WithMaxBlockingTasks(30), ants.WithDisablePurge(true)) var G = NewGlobal( pool, diff --git a/agent-go/octopus-agent-dev.yaml b/agent-go/octopus-agent-dev.yaml index 87c7d61..ac474f7 100644 --- a/agent-go/octopus-agent-dev.yaml +++ b/agent-go/octopus-agent-dev.yaml @@ -10,8 +10,8 @@ spring: config-retry-time: 3000 file-extension: yaml max-retry: 3 - server-addr: "150.230.198.103:21060" -# server-addr: "42.192.52.227:21060" + # server-addr: "150.230.198.103:21060" + server-addr: "42.192.52.227:21060" timeout: 5000 config-long-poll-timeout: 5000 extension-configs: diff --git a/agent-go/rabbitmq/OMsgConnector.go b/agent-go/rabbitmq/OMsgConnector.go index ec016ff..2be7108 100644 --- a/agent-go/rabbitmq/OMsgConnector.go +++ b/agent-go/rabbitmq/OMsgConnector.go @@ -28,26 +28,6 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { } octopusMsgQueue.Connect() - deliveries := octopusMsgQueue.Read(true) - - // 死循环,处理Octopus Message - P.Submit( - func() { - for delivery := range deliveries { - - var om *OctopusMessage - err := json.Unmarshal(delivery.Body, &om) - if err != nil { - log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) - // 保存到某处 - continue - } - - // 策略模式 处理消息 - om.Handle() - } - }) - // 建立 业务消息 返回队列 // 统一为 OctopusToServer @@ -67,6 +47,29 @@ func BuildOMsgRuntimeConnectorQueue(agentTopicName string) { // 开启运行时消息返回队列 OctopusToServerQueue.Connect() - log.InfoF("Octopus Message Replay Queue is established ! => %v", OctopusToServerQueue) + log.InfoF("Octopus Message Business Runtime Queue is established ! => %v", OctopusToServerQueue) + + deliveries := octopusMsgQueue.Read(true) + forever := make(chan bool) + P.Submit( + func() { + // 死循环,处理Octopus Message + for delivery := range deliveries { + + var om *OctopusMessage + err := json.Unmarshal(delivery.Body, &om) + if err != nil { + log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %s", delivery.Body)) + // 保存到某处 + continue + } + + // 策略模式 处理消息 + om.Handle() + } + }) + + // wait forever + <-forever } diff --git a/agent-go/register/AgentInitialization.go b/agent-go/register/AgentInitialization.go index fac6152..436537a 100644 --- a/agent-go/register/AgentInitialization.go +++ b/agent-go/register/AgentInitialization.go @@ -57,18 +57,18 @@ func INIT() *AgentServerInfo { if err != nil { log.Error(fmt.Sprintf("octopus message convert to json is wrong! msg is => %v", octopusMsg)) } - log.Debug(fmt.Sprintf("Prepare to send init message to server! ==> %s", string(msgBytes))) // 发送OM至MQ中 P.Submit( func() { for g.G.AgentHasRegister == false { + log.Debug(fmt.Sprintf("Send init message to server! ==> %s", string(msgBytes))) + //如果agent存活 而Server不存活 那么需要持续不断的向Server中发送消息 initToServerQueue.Send( msgBytes, ) - // 休眠 time.Sleep(10 * time.Minute) @@ -96,66 +96,57 @@ func handleInitMsgFromServer(initFromServerQueue *rabbitmq.RabbitQueue, initToSe initOctopusMessageDeliveries := initFromServerQueue.Read(false) - forever := make(chan bool) + // 同步很多抢占注册的情况 + for delivery := range initOctopusMessageDeliveries { - // use the ant goroutine pool - P.Submit( - func() { + log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) - // 同步很多抢占注册的情况 - for delivery := range initOctopusMessageDeliveries { + var initOctopusMsg *rabbitmq.OctopusMessage + err := json.Unmarshal(delivery.Body, &initOctopusMsg) + if err != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", + string(delivery.Body))) + } - log.Debug(fmt.Sprintf("message received from server is %s", string(delivery.Body))) + var serverInfo AgentServerInfo - var initOctopusMsg *rabbitmq.OctopusMessage - err := json.Unmarshal(delivery.Body, &initOctopusMsg) - if err != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %s ", - string(delivery.Body))) - } + s, _ := initOctopusMsg.Content.(string) + cc := json.Unmarshal([]byte(s), &serverInfo) + if cc != nil { + log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) + } + serverName := serverInfo.ServerName - var serverInfo AgentServerInfo + // 处理OM信息 + if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName { + // 是本机的注册回复信息 + log.InfoF("OctopusMessage INIT from server is this agent !") - s, _ := initOctopusMsg.Content.(string) - cc := json.Unmarshal([]byte(s), &serverInfo) - if cc != nil { - log.Error(fmt.Sprintf("parse init message from server wroong, message is => %v ", cc)) - } - serverName := serverInfo.ServerName + // 手动确认信息 + delivery.Ack(false) - // 处理OM信息 - if initOctopusMsg != nil && initOctopusMsg.Type == g.InitOmType && serverName == agentServerInfo.ServerName { - // 是本机的注册回复信息 + // 修改系统参数 + g.G.AgentHasRegister = true - // 建立 运行时 RabbitMQ连接 - agentTopicName := initOctopusMsg.Result.(string) - rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName) + // 建立 运行时 RabbitMQ连接 + agentTopicName := initOctopusMsg.Result.(string) + rabbitmq.BuildOMsgRuntimeConnectorQueue(agentTopicName) - // 手动确认信息 - delivery.Ack(false) + // 手动关闭 注册队列的连接 + shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) + return + } - // 手动关闭 注册队列的连接 - shutdownRegisterQueueConnection(initFromServerQueue, initToServerQueue) - - return - } - - // 不是自身的 注册回复信息 -- 拒绝 - log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body)) - delivery.Nack(false, true) - } - - }, - ) - - // wait forever - <-forever + // 不是自身的 注册回复信息 -- 拒绝 + log.Warn(fmt.Sprintf("OctopusMessage INIT from server not this agent ! => %v, ==>%s", initOctopusMsg, delivery.Body)) + delivery.Nack(false, true) + } } // shutdownRegisterQueueConnection 关闭初始化连接的两个队列 func shutdownRegisterQueueConnection(initFromServerQueue *rabbitmq.RabbitQueue, initToServerQueue *rabbitmq.RabbitQueue) { - + log.InfoF("Shutdown register queue connection !") } func parseAgentServerInfo() *AgentServerInfo { diff --git a/agent-go/tmp/nacos_config_export_20230330143045.zip b/agent-go/tmp/nacos_config_export_20230330143045.zip new file mode 100644 index 0000000..2c33e55 Binary files /dev/null and b/agent-go/tmp/nacos_config_export_20230330143045.zip differ diff --git a/source/src/main/java/io/wdd/source/octopus/run-enviroment-compose.yaml b/source/src/main/java/io/wdd/source/octopus/run-enviroment-compose.yaml index e1f33b9..107a94e 100644 --- a/source/src/main/java/io/wdd/source/octopus/run-enviroment-compose.yaml +++ b/source/src/main/java/io/wdd/source/octopus/run-enviroment-compose.yaml @@ -78,8 +78,6 @@ services: - MYSQL_SERVICE_PASSWORD=Superwmm.23 depends_on: - mysql - volumes: - - 'rabbitmq_data:/bitnami/rabbitmq/mnesia' # redis-replica: # image: redis-image