Files
ProjectAGiPrompt/8-CMII-RMDC/3-rmdc-exchange-hub/exchange-hub-prompt.md
2026-01-21 16:15:49 +08:00

9.5 KiB
Raw Permalink Blame History

Exchange-Hub模块开发提示词

本文档为大模型提供rmdc-exchange-hub模块的开发上下文。


模块概述

rmdc-exchange-hub 是RMDC平台的消息网关模块负责

  1. MQTT消息中继: 连接内网业务模块与外网Watchdog
  2. 指令生命周期管理: 追踪指令从下发到执行完成的全过程
  3. 项目在线状态管理: 维护各项目的连接状态
  4. 同步/异步指令支持: 支持两种指令执行模式

核心概念

消息方向

方向 描述 Topic
上行 Watchdog → Exchange-Hub wdd/RDMC/command/up, wdd/RDMC/message/up
下行 Exchange-Hub → Watchdog wdd/RDMC/command/down/{project_id}, wdd/RDMC/message/down/{project_id}

消息类型

类型 描述
Command 指令消息,需要执行的操作
Message 数据消息,执行结果/心跳/监控等

技术栈

组件 技术
MQTT客户端 Eclipse Paho
Web框架 Gin
ORM GORM
数据库 PostgreSQL

项目结构

rmdc-exchange-hub/
├── internal/
│   ├── config/
│   │   └── config.go
│   ├── dao/
│   │   ├── command_tracker_dao.go  # 指令追踪
│   │   ├── command_result_dao.go   # 执行结果
│   │   └── project_status_dao.go   # 项目状态
│   ├── handler/
│   │   ├── router.go
│   │   ├── command_handler.go
│   │   └── status_handler.go
│   ├── model/
│   │   ├── dto/
│   │   │   └── exchange_dto.go
│   │   └── entity/
│   │       ├── command_tracker.go
│   │       ├── command_result.go
│   │       └── project_status.go
│   └── service/
│       ├── mqtt_service.go         # MQTT客户端
│       ├── message_router.go       # 消息路由
│       ├── command_service.go      # 指令管理
│       ├── sync_command_manager.go # 同步指令
│       └── state_manager.go        # 状态管理
└── pkg/
    └── mqtt/
        └── client.go

核心数据结构

统一消息格式

// BaseMessage 基础消息结构
type BaseMessage struct {
    MessageID   string `json:"message_id"`   // UUID
    Type        string `json:"type"`         // "command" or "message"
    ProjectID   string `json:"project_id"`
    Timestamp   int64  `json:"timestamp"`    // 毫秒时间戳
    Version     string `json:"version"`
}

// CommandMessage 指令消息
type CommandMessage struct {
    BaseMessage
    CommandType string      `json:"command_type"` // register, k8s_exec, host_exec...
    Payload     interface{} `json:"payload"`
    Signature   string      `json:"signature"`    // HMAC-SHA256签名
}

// DataMessage 数据消息
type DataMessage struct {
    BaseMessage
    DataType  string      `json:"data_type"` // exec_result, heartbeat, monitor...
    Payload   interface{} `json:"payload"`
    Encrypted bool        `json:"encrypted"`
}

指令追踪实体

// CommandTracker 指令追踪器
type CommandTracker struct {
    ID           int64     `gorm:"primaryKey"`
    CommandID    string    `gorm:"uniqueIndex;size:100"`
    ProjectID    string    `gorm:"index;size:100"`
    CommandType  string    `gorm:"size:50"`
    Module       string    `gorm:"size:50"`       // 来源模块
    OperatorID   int64                            // 操作人
    Status       string    `gorm:"size:20"`       // pending, sent, acked, completed, failed, timeout
    SentAt       time.Time                        // 发送时间
    AckedAt      *time.Time                       // 确认时间
    StartedAt    *time.Time                       // 开始执行
    CompletedAt  *time.Time                       // 完成时间
    TimeoutAt    time.Time                        // 超时时间
    IsSync       bool                             // 是否同步指令
    CreatedAt    time.Time
    UpdatedAt    time.Time
}

// CommandResult 指令执行结果
type CommandResult struct {
    ID          int64     `gorm:"primaryKey"`
    CommandID   string    `gorm:"index;size:100"`
    Status      string    `gorm:"size:20"`        // success, failure, timeout
    ExitCode    int
    Output      string    `gorm:"type:text"`
    Error       string    `gorm:"type:text"`
    StartTime   int64
    EndTime     int64
    Duration    int64
    ReceivedAt  time.Time
    CreatedAt   time.Time
}

MQTT Topic设计

上行TopicWatchdog → Exchange-Hub

wdd/RDMC/command/up     # 指令上行(注册、授权请求)
wdd/RDMC/message/up     # 数据上行(心跳、执行结果、监控)

下行TopicExchange-Hub → Watchdog

wdd/RDMC/command/down/{project_id}  # 指令下行
wdd/RDMC/message/down/{project_id}  # 数据下行

关键业务流程

指令下发流程

func (s *CommandService) SendCommand(ctx context.Context, projectID string, cmdType string, payload interface{}) (*CommandTracker, error) {
    // 1. 构造消息
    msg := &common.CommandMessage{
        BaseMessage: common.BaseMessage{
            MessageID: uuid.New().String(),
            Type:      "command",
            ProjectID: projectID,
            Timestamp: time.Now().UnixMilli(),
            Version:   "1.0",
        },
        CommandType: cmdType,
        Payload:     payload,
    }
    
    // 2. 签名
    msg.Signature = s.sign(msg)
    
    // 3. 记录追踪
    tracker := &entity.CommandTracker{
        CommandID:   msg.MessageID,
        ProjectID:   projectID,
        CommandType: cmdType,
        Status:      "pending",
        SentAt:      time.Now(),
        TimeoutAt:   time.Now().Add(30 * time.Second),
    }
    s.trackerDAO.Create(ctx, tracker)
    
    // 4. 发布到MQTT
    topic := fmt.Sprintf("wdd/RDMC/command/down/%s", projectID)
    s.mqttService.Publish(topic, msg)
    
    // 5. 更新状态
    tracker.Status = "sent"
    s.trackerDAO.Update(ctx, tracker)
    
    return tracker, nil
}

同步指令实现

// SyncCommandManager 同步指令管理器
type SyncCommandManager struct {
    waitChannels sync.Map // map[commandID]chan *ExecResult
}

func (m *SyncCommandManager) SendAndWait(ctx context.Context, projectID string, cmd *CommandMessage, timeout time.Duration) (*ExecResult, error) {
    // 1. 创建等待通道
    waitChan := make(chan *ExecResult, 1)
    m.waitChannels.Store(cmd.MessageID, waitChan)
    defer m.waitChannels.Delete(cmd.MessageID)
    
    // 2. 发送指令
    if err := m.mqttService.PublishCommand(projectID, cmd); err != nil {
        return nil, err
    }
    
    // 3. 等待结果或超时
    select {
    case result := <-waitChan:
        return result, nil
    case <-time.After(timeout):
        return nil, errors.New("command execution timeout")
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

// OnResult 接收结果回调
func (m *SyncCommandManager) OnResult(result *ExecResult) {
    if ch, ok := m.waitChannels.Load(result.CommandID); ok {
        ch.(chan *ExecResult) <- result
    }
}

消息路由处理

func (r *MessageRouter) handleUpMessage(msg *common.DataMessage) {
    switch msg.DataType {
    case "exec_result":
        r.handleExecResult(msg)
    case "heartbeat":
        r.handleHeartbeat(msg)
    case "monitor":
        r.handleMonitor(msg)
    case "log_result":
        r.handleLogResult(msg)
    case "alert":
        r.handleAlert(msg)
    case "register_complete":
        r.handleRegisterComplete(msg)
    }
}

func (r *MessageRouter) handleExecResult(msg *common.DataMessage) {
    result := msg.Payload.(*ExecResult)
    
    // 1. 更新指令追踪状态
    r.trackerService.Complete(result.CommandID, result.Status)
    
    // 2. 保存执行结果
    r.resultDAO.Create(result)
    
    // 3. 通知同步等待者
    r.syncManager.OnResult(result)
    
    // 4. 推送给业务模块(如需要)
    // ...
}

项目状态管理

状态机

offline → connecting → verifying → online → disconnecting → offline
                                     ↑
                                     └── heartbeat刷新

心跳策略

参数 默认值 说明
心跳间隔 5秒 Watchdog发送心跳的频率
超时时间 30秒 无心跳多久判定为离线

API接口

接口 方法 说明
/api/exchange-hub/command/send POST 发送异步指令
/api/exchange-hub/command/sync POST 发送同步指令(等待结果)
/api/exchange-hub/command/:id GET 查询指令状态
/api/exchange-hub/project/:id/status GET 查询项目在线状态
/api/exchange-hub/projects/online GET 获取所有在线项目

常见开发任务

1. 添加新的指令类型

  1. common包定义Payload结构
  2. 在Watchdog端添加Handler处理
  3. 在业务模块调用CommandService.SendCommand

2. 添加新的数据类型

  1. MessageRouter.handleUpMessage添加case分支
  2. 实现对应的处理函数
  3. 如需持久化添加Entity和DAO

3. 修改消息格式

  1. 修改common包的消息结构
  2. 同步修改Watchdog端的解析逻辑
  3. 考虑版本兼容性

相关文档

文档 内容
1-rmdc-exchange-hub-DDS.md 详细设计
4-rmdc-exchange-hub-architecture.md 架构图
5-rmdc-exchange-hub-command-time.md 指令耗时计算