9.5 KiB
9.5 KiB
Exchange-Hub模块开发提示词
本文档为大模型提供rmdc-exchange-hub模块的开发上下文。
模块概述
rmdc-exchange-hub 是RMDC平台的消息网关模块,负责:
- MQTT消息中继: 连接内网业务模块与外网Watchdog
- 指令生命周期管理: 追踪指令从下发到执行完成的全过程
- 项目在线状态管理: 维护各项目的连接状态
- 同步/异步指令支持: 支持两种指令执行模式
核心概念
消息方向
| 方向 | 描述 | Topic |
|---|---|---|
| 上行 | Watchdog → Exchange-Hub | wdd/RDMC/command/up, wdd/RDMC/message/up |
| 下行 | Exchange-Hub → Watchdog | wdd/RDMC/command/down/{project_id}, wdd/RDMC/message/down/{project_id} |
消息类型
| 类型 | 描述 |
|---|---|
| Command | 指令消息,需要执行的操作 |
| Message | 数据消息,执行结果/心跳/监控等 |
技术栈
| 组件 | 技术 |
|---|---|
| MQTT客户端 | Eclipse Paho |
| Web框架 | Gin |
| ORM | GORM |
| 数据库 | PostgreSQL |
项目结构
rmdc-exchange-hub/
├── internal/
│ ├── config/
│ │ └── config.go
│ ├── dao/
│ │ ├── command_tracker_dao.go # 指令追踪
│ │ ├── command_result_dao.go # 执行结果
│ │ └── project_status_dao.go # 项目状态
│ ├── handler/
│ │ ├── router.go
│ │ ├── command_handler.go
│ │ └── status_handler.go
│ ├── model/
│ │ ├── dto/
│ │ │ └── exchange_dto.go
│ │ └── entity/
│ │ ├── command_tracker.go
│ │ ├── command_result.go
│ │ └── project_status.go
│ └── service/
│ ├── mqtt_service.go # MQTT客户端
│ ├── message_router.go # 消息路由
│ ├── command_service.go # 指令管理
│ ├── sync_command_manager.go # 同步指令
│ └── state_manager.go # 状态管理
└── pkg/
└── mqtt/
└── client.go
核心数据结构
统一消息格式
// BaseMessage 基础消息结构
type BaseMessage struct {
MessageID string `json:"message_id"` // UUID
Type string `json:"type"` // "command" or "message"
ProjectID string `json:"project_id"`
Timestamp int64 `json:"timestamp"` // 毫秒时间戳
Version string `json:"version"`
}
// CommandMessage 指令消息
type CommandMessage struct {
BaseMessage
CommandType string `json:"command_type"` // register, k8s_exec, host_exec...
Payload interface{} `json:"payload"`
Signature string `json:"signature"` // HMAC-SHA256签名
}
// DataMessage 数据消息
type DataMessage struct {
BaseMessage
DataType string `json:"data_type"` // exec_result, heartbeat, monitor...
Payload interface{} `json:"payload"`
Encrypted bool `json:"encrypted"`
}
指令追踪实体
// CommandTracker 指令追踪器
type CommandTracker struct {
ID int64 `gorm:"primaryKey"`
CommandID string `gorm:"uniqueIndex;size:100"`
ProjectID string `gorm:"index;size:100"`
CommandType string `gorm:"size:50"`
Module string `gorm:"size:50"` // 来源模块
OperatorID int64 // 操作人
Status string `gorm:"size:20"` // pending, sent, acked, completed, failed, timeout
SentAt time.Time // 发送时间
AckedAt *time.Time // 确认时间
StartedAt *time.Time // 开始执行
CompletedAt *time.Time // 完成时间
TimeoutAt time.Time // 超时时间
IsSync bool // 是否同步指令
CreatedAt time.Time
UpdatedAt time.Time
}
// CommandResult 指令执行结果
type CommandResult struct {
ID int64 `gorm:"primaryKey"`
CommandID string `gorm:"index;size:100"`
Status string `gorm:"size:20"` // success, failure, timeout
ExitCode int
Output string `gorm:"type:text"`
Error string `gorm:"type:text"`
StartTime int64
EndTime int64
Duration int64
ReceivedAt time.Time
CreatedAt time.Time
}
MQTT Topic设计
上行Topic(Watchdog → Exchange-Hub)
wdd/RDMC/command/up # 指令上行(注册、授权请求)
wdd/RDMC/message/up # 数据上行(心跳、执行结果、监控)
下行Topic(Exchange-Hub → Watchdog)
wdd/RDMC/command/down/{project_id} # 指令下行
wdd/RDMC/message/down/{project_id} # 数据下行
关键业务流程
指令下发流程
func (s *CommandService) SendCommand(ctx context.Context, projectID string, cmdType string, payload interface{}) (*CommandTracker, error) {
// 1. 构造消息
msg := &common.CommandMessage{
BaseMessage: common.BaseMessage{
MessageID: uuid.New().String(),
Type: "command",
ProjectID: projectID,
Timestamp: time.Now().UnixMilli(),
Version: "1.0",
},
CommandType: cmdType,
Payload: payload,
}
// 2. 签名
msg.Signature = s.sign(msg)
// 3. 记录追踪
tracker := &entity.CommandTracker{
CommandID: msg.MessageID,
ProjectID: projectID,
CommandType: cmdType,
Status: "pending",
SentAt: time.Now(),
TimeoutAt: time.Now().Add(30 * time.Second),
}
s.trackerDAO.Create(ctx, tracker)
// 4. 发布到MQTT
topic := fmt.Sprintf("wdd/RDMC/command/down/%s", projectID)
s.mqttService.Publish(topic, msg)
// 5. 更新状态
tracker.Status = "sent"
s.trackerDAO.Update(ctx, tracker)
return tracker, nil
}
同步指令实现
// SyncCommandManager 同步指令管理器
type SyncCommandManager struct {
waitChannels sync.Map // map[commandID]chan *ExecResult
}
func (m *SyncCommandManager) SendAndWait(ctx context.Context, projectID string, cmd *CommandMessage, timeout time.Duration) (*ExecResult, error) {
// 1. 创建等待通道
waitChan := make(chan *ExecResult, 1)
m.waitChannels.Store(cmd.MessageID, waitChan)
defer m.waitChannels.Delete(cmd.MessageID)
// 2. 发送指令
if err := m.mqttService.PublishCommand(projectID, cmd); err != nil {
return nil, err
}
// 3. 等待结果或超时
select {
case result := <-waitChan:
return result, nil
case <-time.After(timeout):
return nil, errors.New("command execution timeout")
case <-ctx.Done():
return nil, ctx.Err()
}
}
// OnResult 接收结果回调
func (m *SyncCommandManager) OnResult(result *ExecResult) {
if ch, ok := m.waitChannels.Load(result.CommandID); ok {
ch.(chan *ExecResult) <- result
}
}
消息路由处理
func (r *MessageRouter) handleUpMessage(msg *common.DataMessage) {
switch msg.DataType {
case "exec_result":
r.handleExecResult(msg)
case "heartbeat":
r.handleHeartbeat(msg)
case "monitor":
r.handleMonitor(msg)
case "log_result":
r.handleLogResult(msg)
case "alert":
r.handleAlert(msg)
case "register_complete":
r.handleRegisterComplete(msg)
}
}
func (r *MessageRouter) handleExecResult(msg *common.DataMessage) {
result := msg.Payload.(*ExecResult)
// 1. 更新指令追踪状态
r.trackerService.Complete(result.CommandID, result.Status)
// 2. 保存执行结果
r.resultDAO.Create(result)
// 3. 通知同步等待者
r.syncManager.OnResult(result)
// 4. 推送给业务模块(如需要)
// ...
}
项目状态管理
状态机
offline → connecting → verifying → online → disconnecting → offline
↑
└── heartbeat刷新
心跳策略
| 参数 | 默认值 | 说明 |
|---|---|---|
| 心跳间隔 | 5秒 | Watchdog发送心跳的频率 |
| 超时时间 | 30秒 | 无心跳多久判定为离线 |
API接口
| 接口 | 方法 | 说明 |
|---|---|---|
/api/exchange-hub/command/send |
POST | 发送异步指令 |
/api/exchange-hub/command/sync |
POST | 发送同步指令(等待结果) |
/api/exchange-hub/command/:id |
GET | 查询指令状态 |
/api/exchange-hub/project/:id/status |
GET | 查询项目在线状态 |
/api/exchange-hub/projects/online |
GET | 获取所有在线项目 |
常见开发任务
1. 添加新的指令类型
- 在
common包定义Payload结构 - 在Watchdog端添加Handler处理
- 在业务模块调用
CommandService.SendCommand
2. 添加新的数据类型
- 在
MessageRouter.handleUpMessage添加case分支 - 实现对应的处理函数
- 如需持久化,添加Entity和DAO
3. 修改消息格式
- 修改
common包的消息结构 - 同步修改Watchdog端的解析逻辑
- 考虑版本兼容性
相关文档
| 文档 | 内容 |
|---|---|
1-rmdc-exchange-hub-DDS.md |
详细设计 |
4-rmdc-exchange-hub-architecture.md |
架构图 |
5-rmdc-exchange-hub-command-time.md |
指令耗时计算 |