Files
ProjectAGiPrompt/8-CMII-RMDC/6-rmdc-watchdog/2-rmdc-watchdog-业务流程图.md
2026-01-21 16:15:49 +08:00

27 KiB
Raw Permalink Blame History

RMDC-Watchdog 业务流程图

一、项目注册流程

1.1 完整的注册时序流程图

sequenceDiagram
    autonumber
    participant Admin as 管理员<br/>(RMDC Portal)
    participant PM as rmdc-project-management<br/>(项目管理)
    participant ExHub as rmdc-exchange-hub<br/>(消息网关)
    participant MQTT as MQTT Broker
    participant WD as rmdc-watchdog<br/>(二级授权中心)
    participant Node as watchdog-node<br/>(主机守护)
    participant Agent as watchdog-agent<br/>(业务代理)
    
    rect rgb(200, 220, 255)
        Note over Admin,Agent: ===== 阶段1: 项目创建 (RMDC平台侧) =====
    end
    
    Admin->>PM: 1. 创建新项目
    PM->>PM: 2. 生成项目信息<br/>project_id = namespace_<8位随机数>
    PM->>PM: 3. 生成一级授权密钥<br/>tier_one_secret = generateSecret()
    PM->>PM: 4. 生成二级授权密钥<br/>tier_two_secret = generateSecret()
    PM->>PM: 5. 生成时间偏移值<br/>time_offset_allowed
    PM->>PM: 6. 持久化项目信息<br/>{project_id, namespace,<br/>tier_one_secret, tier_two_secret,<br/>auth_duration, time_offset_allowed}
    PM-->>Admin: 7. 返回项目配置文件<br/>(包含tier_one_secret和tier_two_secret<br/>用于部署Watchdog)
    
    rect rgb(220, 255, 220)
        Note over Admin,Agent: ===== 阶段2: Watchdog启动与MQTT连接 =====
    end
    
    Note over WD: 使用项目配置文件部署<br/>本地已有tier_one_secret和tier_two_secret<br/>(密钥不通过公网传输)
    WD->>MQTT: 8. 连接MQTT Broker
    WD->>WD: 9. 创建MQTT订阅<br/>订阅: wdd/RDMC/command/down/<project_id><br/>订阅: wdd/RDMC/message/down/<project_id>
    
    rect rgb(255, 240, 220)
        Note over Admin,Agent: ===== 阶段3: 项目注册 - 挑战应答机制 =====
    end
    
    WD->>WD: 10. 生成Tier-One TOTP验证码<br/>(8位, 30分钟有效)<br/>使用本地tier_one_secret
    WD->>MQTT: 11. 发布注册Command到<br/>wdd/RDMC/command/up<br/>{type:"register", project_id,<br/>namespace, totp_code, env_info}
    MQTT->>ExHub: 12. 转发注册Command
    
    ExHub->>PM: 13. 验证项目信息合法性<br/>GetProjectInfo(project_id)
    PM-->>ExHub: 14. 返回项目密钥等信息<br/>{tier_one_secret, tier_two_secret}
    
    ExHub->>ExHub: 15. 验证TOTP验证码 (可配置开关)<br/>VerifyTierOneTOTP(totp_code, tier_one_secret)
    
    alt TOTP验证成功 或 验证已关闭
        ExHub->>ExHub: 16. 生成32位随机挑战码<br/>challenge = randomString(32)
        ExHub->>ExHub: 17. 生成服务端TOTP响应码<br/>server_totp = GenerateTierOneTOTP(tier_one_secret)
        ExHub->>ExHub: 18. 记录挑战码到缓存<br/>cache[project_id] = challenge
        ExHub->>MQTT: 19. 发布注册确认Message到<br/>wdd/RDMC/message/down/<project_id><br/>{type:"register_ack", challenge, server_totp}
        ExHub->>MQTT: 20. 发布授权Command到<br/>wdd/RDMC/command/down/<project_id><br/>{type:"auth_response", success:true}<br/>(不传输密钥,密钥已在本地)
    else TOTP验证失败
        ExHub->>MQTT: 发布注册拒绝<br/>{type:"register_reject", reason}
    end
    
    MQTT->>WD: 21. 推送注册确认Message
    MQTT->>WD: 22. 推送授权Command
    
    WD->>WD: 23. 验证服务端TOTP (可配置开关)<br/>VerifyTierOneTOTP(server_totp, tier_one_secret)
    
    alt 服务端TOTP验证成功 或 验证已关闭
        WD->>WD: 24. 确认tier_two_secret有效<br/>(使用本地已有的密钥)
        WD->>WD: 25. 解析challenge挑战码
        WD->>MQTT: 26. 发布注册完成Message到<br/>wdd/RDMC/message/up<br/>{type:"register_complete",<br/>project_id, challenge}
    else 服务端TOTP验证失败
        Note over WD: 注册失败,服务端验证不通过<br/>可能是中间人攻击
    end
    
    MQTT->>ExHub: 27. 转发注册完成Message
    ExHub->>ExHub: 28. 验证challenge匹配<br/>cache[project_id] == received_challenge
    
    alt 挑战验证成功
        ExHub->>ExHub: 29. 更新项目状态为Online<br/>UpdateProjectState(project_id, "online")
        ExHub->>PM: 30. 通知项目上线
        PM->>PM: 31. 更新项目在线状态
        Note over WD: 注册成功,开始正常工作
    else 挑战验证失败
        ExHub->>MQTT: 发布验证失败消息
        Note over WD: 注册失败,等待重试
    end
    
    rect rgb(220, 240, 255)
        Note over Admin,Agent: ===== 阶段4: 心跳维持 =====
    end
    
    loop 每5秒
        WD->>MQTT: 发布心跳Message到<br/>wdd/RDMC/message/up<br/>{type:"heartbeat", metrics}
        MQTT->>ExHub: 转发心跳
        ExHub->>ExHub: 刷新项目在线状态
    end

1.2 注册流程关键设计说明

设计要点 说明
密钥生成时机 tier_one_secret 和 tier_two_secret 均在 rmdc-project-management 创建项目时生成
密钥传输方式 密钥通过项目配置文件离线部署到 Watchdog不通过公网MQTT传输
双向TOTP验证 Watchdog 发送 TOTP 给 ExHub 验证ExHub 返回 TOTP 给 Watchdog 验证 (可配置开关)
挑战应答机制 32位随机挑战码确保通信双方身份真实性
安全增强 即使 MQTT 被监听,攻击者也无法伪造有效的 TOTP 验证码

二、授权系统完整流程

2.1 授权系统总览架构

graph TB
    subgraph "RMDC平台 (内网)"
        PM["rmdc-project-management<br/>项目管理"]
        CENTER["rmdc-watchdog-center<br/>一级授权中心"]
        EXHUB["rmdc-exchange-hub<br/>消息网关"]
        DB[(授权数据库)]
    end
    
    subgraph "项目环境 (外网/隔离网络)"
        WATCHDOG["rmdc-watchdog<br/>二级授权中心"]
        AGENT1["watchdog-agent<br/>业务A"]
        AGENT2["watchdog-agent<br/>业务B"]
        NODE1["watchdog-node<br/>主机1"]
        NODE2["watchdog-node<br/>主机2"]
    end
    
    PM --"1.项目创建"--> CENTER
    CENTER --"2.授权配置"--> PM
    CENTER <--"3.授权申请/下发<br/>Tier-One TOTP"--> EXHUB
    EXHUB <==" 4.MQTT公网"==> WATCHDOG
    
    NODE1 --"5.主机信息上报"--> WATCHDOG
    NODE2 --"5.主机信息上报"--> WATCHDOG
    
    AGENT1 <--"6.心跳/授权<br/>Tier-Two TOTP"--> WATCHDOG
    AGENT2 <--"6.心跳/授权<br/>Tier-Two TOTP"--> WATCHDOG
    
    CENTER --> DB
    PM --> DB
    
    style CENTER fill:#ff6b6b
    style WATCHDOG fill:#4ecdc4
    style EXHUB fill:#ffd43b
    style PM fill:#a9e34b

2.2 授权申请与下发流程

sequenceDiagram
    autonumber
    participant Node as rmdc-watchdog-node<br/>(主机)
    participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
    participant MQTT as Exchange-Hub<br/>(MQTT)
    participant Center as rmdc-watchdog-center<br/>(一级授权中心)
    participant DB as 授权数据库
    
    rect rgb(200, 220, 255)
        Note over Node,DB: ===== 阶段1: 主机信息收集 =====
    end
    
    Note over Node: 项目启动,以DaemonSet运行
    Node->>Watchdog: 1. 上报主机硬件信息<br/>{MachineID, CPU, Memory, Serial, IP}
    Watchdog->>Watchdog: 2. 加密主机信息<br/>EncryptHostInfo(hostInfo, tierOneSecret)
    Watchdog->>Watchdog: 3. 生成授权申请文件<br/>GenerateAuthorizationFile()
    
    Note over Watchdog: 授权文件包含:<br/>- EncryptedHostMap<br/>- TOTPCode (8位,30分钟有效)<br/>- EncryptedNamespace
    
    rect rgb(220, 255, 220)
        Note over Node,DB: ===== 阶段2: 授权申请 =====
    end
    
    Watchdog->>MQTT: 4. 发布授权申请Command到<br/>wdd/RDMC/command/up<br/>{type:"auth_request", AuthorizationFile}
    MQTT->>Center: 5. 转发授权申请
    
    Center->>Center: 6. 解密项目命名空间<br/>Decrypt(EncryptedNamespace)
    Center->>DB: 7. 获取项目信息<br/>GetProjectInfo(namespace)
    DB-->>Center: 返回项目密钥等信息
    
    Center->>Center: 8. 验证TOTP验证码<br/>VerifyTierOneTOTPCode()
    Center->>Center: 9. 验证主机信息完整性<br/>DecryptHostInfo() 逐个验证
    
    rect rgb(255, 240, 220)
        Note over Node,DB: ===== 阶段3: 授权下发 =====
    end
    
    alt 验证成功
        Center->>Center: 10. 生成新TOTP验证码
        Center->>Center: 11. 构造授权码<br/>{authorized_hosts, expire_time, tier_two_secret}
        Center->>DB: 12. 持久化授权记录
        Center->>MQTT: 13. 发布授权码到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"auth_response", AuthorizationCode}
    else 验证失败
        Center->>MQTT: 发布授权拒绝消息<br/>{type:"auth_reject", reason}
    end
    
    MQTT->>Watchdog: 14. 推送授权码
    
    Watchdog->>Watchdog: 15. 验证返回的TOTP
    Watchdog->>Watchdog: 16. 解密并验证命名空间
    Watchdog->>Watchdog: 17. 解密每个主机信息
    Watchdog->>Watchdog: 18. 计算时间偏移<br/>timeOffset = now - firstAuthTime
    Watchdog->>Watchdog: 19. 持久化保存授权信息<br/>saveAuthorizationInfo()
    
    Note over Watchdog: 授权存储包含:<br/>- EncryptedAuthorizationCode<br/>- FirstAuthTime<br/>- TimeOffset<br/>- AuthorizedHostMap<br/>- TierTwoTOTPSecret

2.3 Agent授权心跳流程

sequenceDiagram
    autonumber
    participant Agent as rmdc-watchdog-agent<br/>(业务启动器)
    participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
    participant Business as 业务进程<br/>(Java/Python)
    
    rect rgb(200, 255, 200)
        Note over Agent,Business: ===== 首次连接 - 获取密钥 =====
    end
    
    Agent->>Agent: 1. 收集主机信息<br/>GetAllInfo()
    Agent->>Watchdog: 2. 发送心跳请求<br/>{HostInfo, EnvInfo, Timestamp, TOTPCode=""}
    
    Watchdog->>Watchdog: 3. 验证时间戳有效性<br/>|now - timestamp| < 5分钟
    Watchdog->>Watchdog: 4. 添加主机到集合<br/>AddHostInfo()
    Watchdog-->>Agent: 5. 返回响应<br/>{Authorized:false, TierTwoSecret:secret}
    
    Agent->>Agent: 6. 保存TOTP密钥<br/>tierTwoTotpSecret = secret
    
    rect rgb(220, 255, 220)
        Note over Agent,Business: ===== 后续心跳 - 授权验证 =====
    end
    
    loop 心跳循环 (成功后2小时,失败后1小时)
        Agent->>Agent: 7. 生成TOTP验证码<br/>GenerateTierTwoTOTPCode(secret)<br/>6位,30秒有效
        Agent->>Watchdog: 8. 发送心跳请求<br/>{HostInfo, Timestamp, TOTPCode}
        
        Watchdog->>Watchdog: 9. 验证TOTP验证码<br/>VerifyTierTwoTOTPCode()
        
        alt TOTP验证成功
            Watchdog->>Watchdog: 10. 检查主机授权状态<br/>IsHostAuthorized(hostInfo)
            Watchdog->>Watchdog: 11. 生成响应TOTP
            Watchdog-->>Agent: 12. 返回{Authorized:true/false, TOTPCode}
            
            Agent->>Agent: 13. 验证服务端TOTP<br/>双向验证
            
            alt 授权成功
                Agent->>Agent: 14. failCount = 1<br/>等待2小时
            else 授权失败
                Agent->>Agent: 15. failCount++<br/>等待1小时
            end
        else TOTP验证失败
            Watchdog-->>Agent: 返回错误:无效的TOTP验证码
            Agent->>Agent: failCount++
        end
        
        alt failCount >= 12
            Agent->>Business: 16. 发送SIGTERM信号
            Note over Business: 业务进程终止<br/>(死手系统触发)
        end
    end

2.4 授权撤销流程

sequenceDiagram
    autonumber
    participant Admin as 管理员<br/>(RMDC Portal)
    participant PM as project-management
    participant Center as rmdc-watchdog-center<br/>(一级授权中心)
    participant MQTT as Exchange-Hub<br/>(MQTT)
    participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
    participant Agent as rmdc-watchdog-agent
    participant Business as 业务进程
    
    rect rgb(255, 220, 220)
        Note over Admin,Business: ===== 授权撤销流程 =====
    end
    
    Admin->>PM: 1. 发起撤销授权请求<br/>{project_id, reason}
    PM->>Center: 2. 请求撤销项目授权
    Center->>Center: 3. 更新项目授权状态<br/>status = revoked
    Center->>Center: 4. 生成撤销Command<br/>{type:"auth_revoke", project_id, totp}
    
    Center->>MQTT: 5. 发布撤销Command到<br/>wdd/RDMC/command/down/{project_id}
    MQTT->>Watchdog: 6. 推送撤销指令
    
    Watchdog->>Watchdog: 7. 验证撤销指令TOTP
    Watchdog->>Watchdog: 8. 清除本地授权存储<br/>deleteAuthorizationInfo()
    Watchdog->>Watchdog: 9. 设置授权状态为未授权<br/>initialized = false
    
    Note over Agent: 下次心跳时...
    Agent->>Watchdog: 10. 发送心跳请求
    Watchdog-->>Agent: 11. 返回{Authorized:false}
    
    Agent->>Agent: 12. failCount++
    Note over Agent: 连续失败12次后...
    
    Agent->>Business: 13. 发送SIGTERM信号<br/>触发业务进程终止
    Note over Business: 业务自毁<br/>死手系统生效
    
    Watchdog->>MQTT: 14. 上报撤销完成消息<br/>{type:"auth_revoke_ack"}
    MQTT->>Center: 15. 转发撤销确认
    Center->>PM: 16. 更新项目状态
    PM->>Admin: 17. 通知撤销成功

三、K8S指令执行流程

3.1 K8S指令业务流程图

sequenceDiagram
    autonumber
    participant User as 用户<br/>(RMDC Portal)
    participant Api as rmdc-core<br/>(API Gateway)
    participant Operator as octopus-operator<br/>(执行中心)
    participant ExHub as rmdc-exchange-hub<br/>(消息网关)
    participant MQTT as MQTT Broker
    participant WD as rmdc-watchdog<br/>(K8s Operator)
    participant K8s as Kubernetes API
    
    rect rgb(200, 220, 255)
        Note over User,K8s: ===== 阶段1: 指令发起 =====
    end
    
    User->>Api: 1. 发起K8S操作请求<br/>{project_id, action, resource, name}
    Api->>Api: 2. 验证用户权限<br/>CheckPermission(user, project, action)
    Api->>Operator: 3. 调用执行中心API
    
    Operator->>Operator: 4. 构造K8S执行指令<br/>K8sExecCommand{<br/>  command_id, namespace,<br/>  resource, name, action,<br/>  command, timeout<br/>}
    Operator->>ExHub: 5. 调用指令下发API<br/>POST /api/command/send
    
    ExHub->>ExHub: 6. 生成唯一CommandID<br/>记录指令到数据库<br/>状态: Pending
    ExHub->>MQTT: 7. 发布K8S执行Command到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"k8s_exec", payload}
    
    rect rgb(220, 255, 220)
        Note over User,K8s: ===== 阶段2: 指令执行 =====
    end
    
    MQTT->>WD: 8. 推送K8S执行指令
    WD->>WD: 9. 解析指令<br/>路由到K8sHandler
    WD->>WD: 10. 记录开始时间<br/>status = running
    
    alt action == "logs"
        WD->>K8s: 11a. 调用K8S API<br/>GetPodLogs(namespace, name, container)
    else action == "exec"
        WD->>K8s: 11b. 调用K8S API<br/>ExecCommand(pod, container, command)
    else action == "scale"
        WD->>K8s: 11c. 调用K8S API<br/>ScaleDeployment(name, replicas)
    else action == "restart"
        WD->>K8s: 11d. 调用K8S API<br/>RolloutRestart(deployment)
    else action == "delete"
        WD->>K8s: 11e. 调用K8S API<br/>DeleteResource(resource, name)
    end
    
    K8s-->>WD: 12. 返回执行结果
    
    rect rgb(255, 240, 220)
        Note over User,K8s: ===== 阶段3: 结果返回 =====
    end
    
    WD->>WD: 13. 构造执行结果<br/>ExecResult{command_id, status,<br/>exit_code, output, error, duration}
    WD->>MQTT: 14. 发布结果Message到<br/>wdd/RDMC/message/up<br/>{type:"exec_result", payload}
    
    MQTT->>ExHub: 15. 转发执行结果
    ExHub->>ExHub: 16. 更新指令状态<br/>记录执行时长
    ExHub->>Operator: 17. 推送结果给执行中心
    Operator->>Api: 18. 返回执行结果
    Api->>User: 19. 展示执行结果

3.2 K8S支持的操作类型

Action 说明 目标资源 参数
logs 获取日志 Pod container, tail_lines, follow
exec 执行命令 Pod container, command[], timeout
scale 扩缩容 Deployment/StatefulSet scale_count
restart 滚动重启 Deployment/StatefulSet -
delete 删除资源 Pod/Deployment/Service等 -
get 获取信息 任意资源 output_format
apply 应用配置 任意资源 yaml_content

四、主机指令执行流程

4.1 主机指令业务流程图

sequenceDiagram
    autonumber
    participant User as 用户<br/>(RMDC Portal)
    participant Api as rmdc-core<br/>(API Gateway)
    participant Operator as octopus-operator<br/>(执行中心)
    participant ExHub as rmdc-exchange-hub<br/>(消息网关)
    participant MQTT as MQTT Broker
    participant WD as rmdc-watchdog<br/>(二级授权中心)
    participant Node as watchdog-node<br/>(主机守护)
    
    rect rgb(200, 220, 255)
        Note over User,Node: ===== 阶段1: 指令发起 =====
    end
    
    User->>Api: 1. 发起主机操作请求<br/>{project_id, host_id, action, script}
    Api->>Api: 2. 验证用户权限<br/>CheckPermission(user, project, "host_exec")
    Api->>Operator: 3. 调用执行中心API
    
    Operator->>Operator: 4. 构造主机执行指令<br/>HostExecCommand{<br/>  command_id, host_id,<br/>  action, script, args, timeout<br/>}
    Operator->>ExHub: 5. 调用指令下发API<br/>POST /api/command/send
    
    ExHub->>ExHub: 6. 生成唯一CommandID<br/>记录指令到数据库<br/>状态: Pending
    ExHub->>MQTT: 7. 发布主机执行Command到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"host_exec", payload}
    
    rect rgb(220, 255, 220)
        Note over User,Node: ===== 阶段2: 指令转发与执行 =====
    end
    
    MQTT->>WD: 8. 推送主机执行指令
    WD->>WD: 9. 解析指令<br/>路由到HostHandler
    WD->>WD: 10. 验证目标主机在线<br/>CheckHostOnline(host_id)
    
    alt 主机在线
        WD->>Node: 11. 转发执行指令到目标Node<br/>HTTP POST /api/exec<br/>(内网通信,TOTP验证)
        Node->>Node: 12. 验证请求合法性<br/>VerifyTOTP()
        Node->>Node: 13. 执行命令/脚本<br/>ExecuteScript(script, args)
        Node-->>WD: 14. 返回执行结果<br/>{exit_code, stdout, stderr}
    else 主机离线
        WD->>WD: 构造错误结果<br/>error = "目标主机离线"
    end
    
    rect rgb(255, 240, 220)
        Note over User,Node: ===== 阶段3: 结果返回 =====
    end
    
    WD->>WD: 15. 构造执行结果<br/>ExecResult{command_id, status,<br/>exit_code, output, error, duration}
    WD->>MQTT: 16. 发布结果Message到<br/>wdd/RDMC/message/up<br/>{type:"exec_result", payload}
    
    MQTT->>ExHub: 17. 转发执行结果
    ExHub->>ExHub: 18. 更新指令状态<br/>记录执行时长
    ExHub->>Operator: 19. 推送结果给执行中心
    Operator->>Api: 20. 返回执行结果
    Api->>User: 21. 展示执行结果

4.2 Watchdog到Node的通信架构

graph TB
    subgraph "rmdc-watchdog"
        WD_Router[消息路由器]
        WD_HostH[HostHandler]
        WD_NodeClient[NodeClient<br/>HTTP Client]
    end
    
    subgraph "watchdog-node (DaemonSet)"
        Node_Server[HTTP Server<br/>:8081]
        Node_Auth[TOTP验证中间件]
        Node_Exec[命令执行器]
        Node_Info[信息收集器]
    end
    
    WD_Router --> WD_HostH
    WD_HostH --> WD_NodeClient
    WD_NodeClient ==HTTP/TOTP==> Node_Server
    Node_Server --> Node_Auth
    Node_Auth --> Node_Exec
    Node_Auth --> Node_Info
    
    style WD_NodeClient fill:#4ecdc4,stroke:#087f5b
    style Node_Server fill:#a9e34b,stroke:#5c940d

4.3 主机支持的操作类型

Action 说明 参数
exec 执行Shell命令 script, args[], timeout
info 获取主机信息 info_type (cpu/memory/disk/network)
service 服务管理 service_name, operation (start/stop/restart)
file 文件操作 path, operation (read/write/delete)
dltu 镜像操作 operation (download/load/tag/upload), params

五、同步指令设计方案

5.1 同步指令可行性分析

适用场景分析

场景 当前模式 同步需求 实现建议
日志查看 异步 高 - 需要实时看到日志输出 长轮询或推送
主机命令执行 异步 中 - 交互式命令需要实时 超时等待机制
K8S资源查询 异步 中 - 查询操作期望快速返回 超时等待机制
业务更新 异步 低 - 长时间操作适合异步 保持异步 + 通知
监控数据 异步 低 - 定期上报即可 保持异步

技术挑战

  1. 网络延迟: 跨公网MQTT通信存在不确定延迟
  2. 超时处理: 需要合理设置超时时间避免长时间阻塞
  3. 连接保持: 长时间等待需要保持连接不断开
  4. 资源占用: 同步等待会占用服务器连接资源

5.2 基于现有架构的同步指令实现方案

方案设计 (基于Go Channel + HTTP长轮询)

根据项目现有的Go技术栈,建议采用HTTP长轮询 + 结果缓存方案:

sequenceDiagram
    autonumber
    participant User as 用户
    participant Api as API Gateway
    participant ExHub as Exchange-Hub
    participant Cache as ResultCache<br/>(sync.Map)
    participant MQTT as MQTT Broker
    participant WD as Watchdog
    
    rect rgb(200, 220, 255)
        Note over User,WD: ===== 同步指令发送 =====
    end
    
    User->>Api: 1. POST /api/command/sync<br/>{project_id, type, payload, timeout:30s}
    Api->>ExHub: 2. 调用同步指令API
    ExHub->>ExHub: 3. 生成CommandID
    ExHub->>Cache: 4. 创建等待通道<br/>waitChan[cmdId] = make(chan Result)
    ExHub->>MQTT: 5. 发布指令
    MQTT->>WD: 6. 推送到Watchdog
    
    rect rgb(220, 255, 220)
        Note over User,WD: ===== 执行与等待 =====
    end
    
    WD->>WD: 7. 执行指令
    WD->>MQTT: 8. 返回结果
    MQTT->>ExHub: 9. 接收结果
    ExHub->>Cache: 10. 发送结果到通道<br/>waitChan[cmdId] <- result
    
    rect rgb(255, 240, 220)
        Note over User,WD: ===== 结果返回 =====
    end
    
    ExHub->>ExHub: 11. select等待结果或超时
    ExHub->>Api: 12. 返回执行结果
    Api->>User: 13. 返回结果给用户

Exchange-Hub同步指令管理器设计

// SyncCommandManager 同步指令管理器
type SyncCommandManager struct {
    waitChannels sync.Map // map[commandID]chan *ExecResult
    timeout      time.Duration
}

// SendAndWait 发送指令并等待结果
func (m *SyncCommandManager) SendAndWait(projectID string, cmd *CommandMessage, timeout time.Duration) (*ExecResult, error) {
    // 1. 创建等待通道
    waitChan := make(chan *ExecResult, 1)
    m.waitChannels.Store(cmd.MessageID, waitChan)
    defer m.waitChannels.Delete(cmd.MessageID)
    
    // 2. 发送指令
    mqttService := GetMQTTService()
    if err := mqttService.PublishCommand(projectID, cmd); err != nil {
        return nil, err
    }
    
    // 3. 等待结果或超时
    select {
    case result := <-waitChan:
        return result, nil
    case <-time.After(timeout):
        return nil, errors.New("command execution timeout")
    }
}

// OnResult 接收结果回调
func (m *SyncCommandManager) OnResult(result *ExecResult) {
    if ch, ok := m.waitChannels.Load(result.CommandID); ok {
        ch.(chan *ExecResult) <- result
    }
}

5.3 日志实时查看方案

对于日志实时查看场景,建议采用流式日志推送方案:

sequenceDiagram
    autonumber
    participant User as 用户
    participant Frontend as 前端
    participant Api as API Gateway
    participant ExHub as Exchange-Hub
    participant MQTT as MQTT Broker
    participant WD as Watchdog
    participant K8s as K8S API
    
    User->>Frontend: 1. 点击查看实时日志
    Frontend->>Api: 2. POST /api/logs/stream<br/>{project_id, pod, container, follow:true}
    Api->>ExHub: 3. 创建日志流会话<br/>sessionId = uuid()
    ExHub->>MQTT: 4. 发布日志查询指令<br/>{type:"log_query", follow:true, session_id}
    MQTT->>WD: 5. 推送指令
    WD->>K8s: 6. 调用K8S Logs API (follow=true)
    
    loop 日志流
        K8s-->>WD: 7. 返回日志行
        WD->>MQTT: 8. 发布日志Message<br/>{type:"log_result", session_id, lines, is_complete:false}
        MQTT->>ExHub: 9. 转发日志
        ExHub->>ExHub: 10. 根据session_id路由
        ExHub->>Api: 11. 推送到API会话
        Api->>Frontend: 12. SSE/轮询返回日志
        Frontend->>User: 13. 实时显示日志
    end
    
    User->>Frontend: 14. 停止查看
    Frontend->>Api: 15. DELETE /api/logs/stream/{session_id}
    Api->>ExHub: 16. 关闭日志流会话
    ExHub->>MQTT: 17. 发布停止指令

前端实现建议

// 使用Server-Sent Events (SSE) 接收实时日志
async function streamLogs(projectId: string, pod: string, container: string) {
  const response = await fetch(`/api/logs/stream`, {
    method: 'POST',
    body: JSON.stringify({ project_id: projectId, pod, container, follow: true })
  });
  
  const sessionId = (await response.json()).session_id;
  
  // 使用SSE或长轮询接收日志
  const eventSource = new EventSource(`/api/logs/stream/${sessionId}`);
  
  eventSource.onmessage = (event) => {
    const logData = JSON.parse(event.data);
    appendLogToUI(logData.lines);
  };
  
  return {
    stop: () => {
      eventSource.close();
      fetch(`/api/logs/stream/${sessionId}`, { method: 'DELETE' });
    }
  };
}

六、数据结构与安全机制汇总

6.1 MQTT Topic与消息类型对照表

方向 Topic 消息类型 说明
上行 wdd/RDMC/command/up register 项目注册
上行 wdd/RDMC/command/up auth_request 授权申请
上行 wdd/RDMC/message/up register_complete 注册完成确认
上行 wdd/RDMC/message/up heartbeat 心跳数据
上行 wdd/RDMC/message/up monitor 监控数据上报
上行 wdd/RDMC/message/up exec_result 指令执行结果
上行 wdd/RDMC/message/up log_result 日志查询结果
上行 wdd/RDMC/message/up alert 告警信息
下行 wdd/RDMC/command/down/{id} auth_response 授权响应
下行 wdd/RDMC/command/down/{id} auth_revoke 授权撤销
下行 wdd/RDMC/command/down/{id} log_query 日志查询指令
下行 wdd/RDMC/command/down/{id} host_exec 主机执行指令
下行 wdd/RDMC/command/down/{id} k8s_exec K8S执行指令
下行 wdd/RDMC/command/down/{id} update 业务更新指令
下行 wdd/RDMC/message/down/{id} register_ack 注册确认消息

6.2 安全机制汇总

场景 安全机制 参数
Center ↔ Watchdog Tier-One TOTP + AES-GCM 8位码, 30分钟有效期, SHA256
Watchdog ↔ Agent Tier-Two TOTP 6位码, 30秒有效期, SHA1
Watchdog ↔ Node Tier-Two TOTP复用 内网HTTP + TOTP认证
HTTP备用接口 复用Tier-Two TOTP密钥 需要TOTP认证
主机信息 硬件指纹绑定 MachineID+CPU+Memory+Serial
死手系统 心跳失败自毁 连续12次失败触发
消息传输 TLS加密 MQTT over TLS
敏感数据 AES-256-GCM加密 授权码、密钥等