# Exchange-Hub模块开发提示词 > 本文档为大模型提供rmdc-exchange-hub模块的开发上下文。 --- ## 模块概述 **rmdc-exchange-hub** 是RMDC平台的消息网关模块,负责: 1. **MQTT消息中继**: 连接内网业务模块与外网Watchdog 2. **指令生命周期管理**: 追踪指令从下发到执行完成的全过程 3. **项目在线状态管理**: 维护各项目的连接状态 4. **同步/异步指令支持**: 支持两种指令执行模式 --- ## 核心概念 ### 消息方向 | 方向 | 描述 | Topic | |------|------|-------| | **上行** | Watchdog → Exchange-Hub | `wdd/RDMC/command/up`, `wdd/RDMC/message/up` | | **下行** | Exchange-Hub → Watchdog | `wdd/RDMC/command/down/{project_id}`, `wdd/RDMC/message/down/{project_id}` | ### 消息类型 | 类型 | 描述 | |------|------| | **Command** | 指令消息,需要执行的操作 | | **Message** | 数据消息,执行结果/心跳/监控等 | --- ## 技术栈 | 组件 | 技术 | |------|------| | MQTT客户端 | Eclipse Paho | | Web框架 | Gin | | ORM | GORM | | 数据库 | PostgreSQL | --- ## 项目结构 ``` rmdc-exchange-hub/ ├── internal/ │ ├── config/ │ │ └── config.go │ ├── dao/ │ │ ├── command_tracker_dao.go # 指令追踪 │ │ ├── command_result_dao.go # 执行结果 │ │ └── project_status_dao.go # 项目状态 │ ├── handler/ │ │ ├── router.go │ │ ├── command_handler.go │ │ └── status_handler.go │ ├── model/ │ │ ├── dto/ │ │ │ └── exchange_dto.go │ │ └── entity/ │ │ ├── command_tracker.go │ │ ├── command_result.go │ │ └── project_status.go │ └── service/ │ ├── mqtt_service.go # MQTT客户端 │ ├── message_router.go # 消息路由 │ ├── command_service.go # 指令管理 │ ├── sync_command_manager.go # 同步指令 │ └── state_manager.go # 状态管理 └── pkg/ └── mqtt/ └── client.go ``` --- ## 核心数据结构 ### 统一消息格式 ```go // BaseMessage 基础消息结构 type BaseMessage struct { MessageID string `json:"message_id"` // UUID Type string `json:"type"` // "command" or "message" ProjectID string `json:"project_id"` Timestamp int64 `json:"timestamp"` // 毫秒时间戳 Version string `json:"version"` } // CommandMessage 指令消息 type CommandMessage struct { BaseMessage CommandType string `json:"command_type"` // register, k8s_exec, host_exec... Payload interface{} `json:"payload"` Signature string `json:"signature"` // HMAC-SHA256签名 } // DataMessage 数据消息 type DataMessage struct { BaseMessage DataType string `json:"data_type"` // exec_result, heartbeat, monitor... Payload interface{} `json:"payload"` Encrypted bool `json:"encrypted"` } ``` ### 指令追踪实体 ```go // CommandTracker 指令追踪器 type CommandTracker struct { ID int64 `gorm:"primaryKey"` CommandID string `gorm:"uniqueIndex;size:100"` ProjectID string `gorm:"index;size:100"` CommandType string `gorm:"size:50"` Module string `gorm:"size:50"` // 来源模块 OperatorID int64 // 操作人 Status string `gorm:"size:20"` // pending, sent, acked, completed, failed, timeout SentAt time.Time // 发送时间 AckedAt *time.Time // 确认时间 StartedAt *time.Time // 开始执行 CompletedAt *time.Time // 完成时间 TimeoutAt time.Time // 超时时间 IsSync bool // 是否同步指令 CreatedAt time.Time UpdatedAt time.Time } // CommandResult 指令执行结果 type CommandResult struct { ID int64 `gorm:"primaryKey"` CommandID string `gorm:"index;size:100"` Status string `gorm:"size:20"` // success, failure, timeout ExitCode int Output string `gorm:"type:text"` Error string `gorm:"type:text"` StartTime int64 EndTime int64 Duration int64 ReceivedAt time.Time CreatedAt time.Time } ``` --- ## MQTT Topic设计 ### 上行Topic(Watchdog → Exchange-Hub) ``` wdd/RDMC/command/up # 指令上行(注册、授权请求) wdd/RDMC/message/up # 数据上行(心跳、执行结果、监控) ``` ### 下行Topic(Exchange-Hub → Watchdog) ``` wdd/RDMC/command/down/{project_id} # 指令下行 wdd/RDMC/message/down/{project_id} # 数据下行 ``` --- ## 关键业务流程 ### 指令下发流程 ```go func (s *CommandService) SendCommand(ctx context.Context, projectID string, cmdType string, payload interface{}) (*CommandTracker, error) { // 1. 构造消息 msg := &common.CommandMessage{ BaseMessage: common.BaseMessage{ MessageID: uuid.New().String(), Type: "command", ProjectID: projectID, Timestamp: time.Now().UnixMilli(), Version: "1.0", }, CommandType: cmdType, Payload: payload, } // 2. 签名 msg.Signature = s.sign(msg) // 3. 记录追踪 tracker := &entity.CommandTracker{ CommandID: msg.MessageID, ProjectID: projectID, CommandType: cmdType, Status: "pending", SentAt: time.Now(), TimeoutAt: time.Now().Add(30 * time.Second), } s.trackerDAO.Create(ctx, tracker) // 4. 发布到MQTT topic := fmt.Sprintf("wdd/RDMC/command/down/%s", projectID) s.mqttService.Publish(topic, msg) // 5. 更新状态 tracker.Status = "sent" s.trackerDAO.Update(ctx, tracker) return tracker, nil } ``` ### 同步指令实现 ```go // SyncCommandManager 同步指令管理器 type SyncCommandManager struct { waitChannels sync.Map // map[commandID]chan *ExecResult } func (m *SyncCommandManager) SendAndWait(ctx context.Context, projectID string, cmd *CommandMessage, timeout time.Duration) (*ExecResult, error) { // 1. 创建等待通道 waitChan := make(chan *ExecResult, 1) m.waitChannels.Store(cmd.MessageID, waitChan) defer m.waitChannels.Delete(cmd.MessageID) // 2. 发送指令 if err := m.mqttService.PublishCommand(projectID, cmd); err != nil { return nil, err } // 3. 等待结果或超时 select { case result := <-waitChan: return result, nil case <-time.After(timeout): return nil, errors.New("command execution timeout") case <-ctx.Done(): return nil, ctx.Err() } } // OnResult 接收结果回调 func (m *SyncCommandManager) OnResult(result *ExecResult) { if ch, ok := m.waitChannels.Load(result.CommandID); ok { ch.(chan *ExecResult) <- result } } ``` ### 消息路由处理 ```go func (r *MessageRouter) handleUpMessage(msg *common.DataMessage) { switch msg.DataType { case "exec_result": r.handleExecResult(msg) case "heartbeat": r.handleHeartbeat(msg) case "monitor": r.handleMonitor(msg) case "log_result": r.handleLogResult(msg) case "alert": r.handleAlert(msg) case "register_complete": r.handleRegisterComplete(msg) } } func (r *MessageRouter) handleExecResult(msg *common.DataMessage) { result := msg.Payload.(*ExecResult) // 1. 更新指令追踪状态 r.trackerService.Complete(result.CommandID, result.Status) // 2. 保存执行结果 r.resultDAO.Create(result) // 3. 通知同步等待者 r.syncManager.OnResult(result) // 4. 推送给业务模块(如需要) // ... } ``` --- ## 项目状态管理 ### 状态机 ``` offline → connecting → verifying → online → disconnecting → offline ↑ └── heartbeat刷新 ``` ### 心跳策略 | 参数 | 默认值 | 说明 | |------|--------|------| | 心跳间隔 | 5秒 | Watchdog发送心跳的频率 | | 超时时间 | 30秒 | 无心跳多久判定为离线 | --- ## API接口 | 接口 | 方法 | 说明 | |------|------|------| | `/api/exchange-hub/command/send` | POST | 发送异步指令 | | `/api/exchange-hub/command/sync` | POST | 发送同步指令(等待结果) | | `/api/exchange-hub/command/:id` | GET | 查询指令状态 | | `/api/exchange-hub/project/:id/status` | GET | 查询项目在线状态 | | `/api/exchange-hub/projects/online` | GET | 获取所有在线项目 | --- ## 常见开发任务 ### 1. 添加新的指令类型 1. 在`common`包定义Payload结构 2. 在Watchdog端添加Handler处理 3. 在业务模块调用`CommandService.SendCommand` ### 2. 添加新的数据类型 1. 在`MessageRouter.handleUpMessage`添加case分支 2. 实现对应的处理函数 3. 如需持久化,添加Entity和DAO ### 3. 修改消息格式 1. 修改`common`包的消息结构 2. 同步修改Watchdog端的解析逻辑 3. 考虑版本兼容性 --- ## 相关文档 | 文档 | 内容 | |------|------| | `1-rmdc-exchange-hub-DDS.md` | 详细设计 | | `4-rmdc-exchange-hub-architecture.md` | 架构图 | | `5-rmdc-exchange-hub-command-time.md` | 指令耗时计算 |