27 KiB
27 KiB
RMDC-Watchdog 业务流程图
一、项目注册流程
1.1 完整的注册时序流程图
sequenceDiagram
autonumber
participant Admin as 管理员<br/>(RMDC Portal)
participant PM as rmdc-project-management<br/>(项目管理)
participant ExHub as rmdc-exchange-hub<br/>(消息网关)
participant MQTT as MQTT Broker
participant WD as rmdc-watchdog<br/>(二级授权中心)
participant Node as watchdog-node<br/>(主机守护)
participant Agent as watchdog-agent<br/>(业务代理)
rect rgb(200, 220, 255)
Note over Admin,Agent: ===== 阶段1: 项目创建 (RMDC平台侧) =====
end
Admin->>PM: 1. 创建新项目
PM->>PM: 2. 生成项目信息<br/>project_id = namespace_<8位随机数>
PM->>PM: 3. 生成一级授权密钥<br/>tier_one_secret = generateSecret()
PM->>PM: 4. 生成二级授权密钥<br/>tier_two_secret = generateSecret()
PM->>PM: 5. 生成时间偏移值<br/>time_offset_allowed
PM->>PM: 6. 持久化项目信息<br/>{project_id, namespace,<br/>tier_one_secret, tier_two_secret,<br/>auth_duration, time_offset_allowed}
PM-->>Admin: 7. 返回项目配置文件<br/>(包含tier_one_secret和tier_two_secret<br/>用于部署Watchdog)
rect rgb(220, 255, 220)
Note over Admin,Agent: ===== 阶段2: Watchdog启动与MQTT连接 =====
end
Note over WD: 使用项目配置文件部署<br/>本地已有tier_one_secret和tier_two_secret<br/>(密钥不通过公网传输)
WD->>MQTT: 8. 连接MQTT Broker
WD->>WD: 9. 创建MQTT订阅<br/>订阅: wdd/RDMC/command/down/<project_id><br/>订阅: wdd/RDMC/message/down/<project_id>
rect rgb(255, 240, 220)
Note over Admin,Agent: ===== 阶段3: 项目注册 - 挑战应答机制 =====
end
WD->>WD: 10. 生成Tier-One TOTP验证码<br/>(8位, 30分钟有效)<br/>使用本地tier_one_secret
WD->>MQTT: 11. 发布注册Command到<br/>wdd/RDMC/command/up<br/>{type:"register", project_id,<br/>namespace, totp_code, env_info}
MQTT->>ExHub: 12. 转发注册Command
ExHub->>PM: 13. 验证项目信息合法性<br/>GetProjectInfo(project_id)
PM-->>ExHub: 14. 返回项目密钥等信息<br/>{tier_one_secret, tier_two_secret}
ExHub->>ExHub: 15. 验证TOTP验证码 (可配置开关)<br/>VerifyTierOneTOTP(totp_code, tier_one_secret)
alt TOTP验证成功 或 验证已关闭
ExHub->>ExHub: 16. 生成32位随机挑战码<br/>challenge = randomString(32)
ExHub->>ExHub: 17. 生成服务端TOTP响应码<br/>server_totp = GenerateTierOneTOTP(tier_one_secret)
ExHub->>ExHub: 18. 记录挑战码到缓存<br/>cache[project_id] = challenge
ExHub->>MQTT: 19. 发布注册确认Message到<br/>wdd/RDMC/message/down/<project_id><br/>{type:"register_ack", challenge, server_totp}
ExHub->>MQTT: 20. 发布授权Command到<br/>wdd/RDMC/command/down/<project_id><br/>{type:"auth_response", success:true}<br/>(不传输密钥,密钥已在本地)
else TOTP验证失败
ExHub->>MQTT: 发布注册拒绝<br/>{type:"register_reject", reason}
end
MQTT->>WD: 21. 推送注册确认Message
MQTT->>WD: 22. 推送授权Command
WD->>WD: 23. 验证服务端TOTP (可配置开关)<br/>VerifyTierOneTOTP(server_totp, tier_one_secret)
alt 服务端TOTP验证成功 或 验证已关闭
WD->>WD: 24. 确认tier_two_secret有效<br/>(使用本地已有的密钥)
WD->>WD: 25. 解析challenge挑战码
WD->>MQTT: 26. 发布注册完成Message到<br/>wdd/RDMC/message/up<br/>{type:"register_complete",<br/>project_id, challenge}
else 服务端TOTP验证失败
Note over WD: 注册失败,服务端验证不通过<br/>可能是中间人攻击
end
MQTT->>ExHub: 27. 转发注册完成Message
ExHub->>ExHub: 28. 验证challenge匹配<br/>cache[project_id] == received_challenge
alt 挑战验证成功
ExHub->>ExHub: 29. 更新项目状态为Online<br/>UpdateProjectState(project_id, "online")
ExHub->>PM: 30. 通知项目上线
PM->>PM: 31. 更新项目在线状态
Note over WD: 注册成功,开始正常工作
else 挑战验证失败
ExHub->>MQTT: 发布验证失败消息
Note over WD: 注册失败,等待重试
end
rect rgb(220, 240, 255)
Note over Admin,Agent: ===== 阶段4: 心跳维持 =====
end
loop 每5秒
WD->>MQTT: 发布心跳Message到<br/>wdd/RDMC/message/up<br/>{type:"heartbeat", metrics}
MQTT->>ExHub: 转发心跳
ExHub->>ExHub: 刷新项目在线状态
end
1.2 注册流程关键设计说明
| 设计要点 | 说明 |
|---|---|
| 密钥生成时机 | tier_one_secret 和 tier_two_secret 均在 rmdc-project-management 创建项目时生成 |
| 密钥传输方式 | 密钥通过项目配置文件离线部署到 Watchdog,不通过公网MQTT传输 |
| 双向TOTP验证 | Watchdog 发送 TOTP 给 ExHub 验证,ExHub 返回 TOTP 给 Watchdog 验证 (可配置开关) |
| 挑战应答机制 | 32位随机挑战码确保通信双方身份真实性 |
| 安全增强 | 即使 MQTT 被监听,攻击者也无法伪造有效的 TOTP 验证码 |
二、授权系统完整流程
2.1 授权系统总览架构
graph TB
subgraph "RMDC平台 (内网)"
PM["rmdc-project-management<br/>项目管理"]
CENTER["rmdc-watchdog-center<br/>一级授权中心"]
EXHUB["rmdc-exchange-hub<br/>消息网关"]
DB[(授权数据库)]
end
subgraph "项目环境 (外网/隔离网络)"
WATCHDOG["rmdc-watchdog<br/>二级授权中心"]
AGENT1["watchdog-agent<br/>业务A"]
AGENT2["watchdog-agent<br/>业务B"]
NODE1["watchdog-node<br/>主机1"]
NODE2["watchdog-node<br/>主机2"]
end
PM --"1.项目创建"--> CENTER
CENTER --"2.授权配置"--> PM
CENTER <--"3.授权申请/下发<br/>Tier-One TOTP"--> EXHUB
EXHUB <==" 4.MQTT公网"==> WATCHDOG
NODE1 --"5.主机信息上报"--> WATCHDOG
NODE2 --"5.主机信息上报"--> WATCHDOG
AGENT1 <--"6.心跳/授权<br/>Tier-Two TOTP"--> WATCHDOG
AGENT2 <--"6.心跳/授权<br/>Tier-Two TOTP"--> WATCHDOG
CENTER --> DB
PM --> DB
style CENTER fill:#ff6b6b
style WATCHDOG fill:#4ecdc4
style EXHUB fill:#ffd43b
style PM fill:#a9e34b
2.2 授权申请与下发流程
sequenceDiagram
autonumber
participant Node as rmdc-watchdog-node<br/>(主机)
participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
participant MQTT as Exchange-Hub<br/>(MQTT)
participant Center as rmdc-watchdog-center<br/>(一级授权中心)
participant DB as 授权数据库
rect rgb(200, 220, 255)
Note over Node,DB: ===== 阶段1: 主机信息收集 =====
end
Note over Node: 项目启动,以DaemonSet运行
Node->>Watchdog: 1. 上报主机硬件信息<br/>{MachineID, CPU, Memory, Serial, IP}
Watchdog->>Watchdog: 2. 加密主机信息<br/>EncryptHostInfo(hostInfo, tierOneSecret)
Watchdog->>Watchdog: 3. 生成授权申请文件<br/>GenerateAuthorizationFile()
Note over Watchdog: 授权文件包含:<br/>- EncryptedHostMap<br/>- TOTPCode (8位,30分钟有效)<br/>- EncryptedNamespace
rect rgb(220, 255, 220)
Note over Node,DB: ===== 阶段2: 授权申请 =====
end
Watchdog->>MQTT: 4. 发布授权申请Command到<br/>wdd/RDMC/command/up<br/>{type:"auth_request", AuthorizationFile}
MQTT->>Center: 5. 转发授权申请
Center->>Center: 6. 解密项目命名空间<br/>Decrypt(EncryptedNamespace)
Center->>DB: 7. 获取项目信息<br/>GetProjectInfo(namespace)
DB-->>Center: 返回项目密钥等信息
Center->>Center: 8. 验证TOTP验证码<br/>VerifyTierOneTOTPCode()
Center->>Center: 9. 验证主机信息完整性<br/>DecryptHostInfo() 逐个验证
rect rgb(255, 240, 220)
Note over Node,DB: ===== 阶段3: 授权下发 =====
end
alt 验证成功
Center->>Center: 10. 生成新TOTP验证码
Center->>Center: 11. 构造授权码<br/>{authorized_hosts, expire_time, tier_two_secret}
Center->>DB: 12. 持久化授权记录
Center->>MQTT: 13. 发布授权码到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"auth_response", AuthorizationCode}
else 验证失败
Center->>MQTT: 发布授权拒绝消息<br/>{type:"auth_reject", reason}
end
MQTT->>Watchdog: 14. 推送授权码
Watchdog->>Watchdog: 15. 验证返回的TOTP
Watchdog->>Watchdog: 16. 解密并验证命名空间
Watchdog->>Watchdog: 17. 解密每个主机信息
Watchdog->>Watchdog: 18. 计算时间偏移<br/>timeOffset = now - firstAuthTime
Watchdog->>Watchdog: 19. 持久化保存授权信息<br/>saveAuthorizationInfo()
Note over Watchdog: 授权存储包含:<br/>- EncryptedAuthorizationCode<br/>- FirstAuthTime<br/>- TimeOffset<br/>- AuthorizedHostMap<br/>- TierTwoTOTPSecret
2.3 Agent授权心跳流程
sequenceDiagram
autonumber
participant Agent as rmdc-watchdog-agent<br/>(业务启动器)
participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
participant Business as 业务进程<br/>(Java/Python)
rect rgb(200, 255, 200)
Note over Agent,Business: ===== 首次连接 - 获取密钥 =====
end
Agent->>Agent: 1. 收集主机信息<br/>GetAllInfo()
Agent->>Watchdog: 2. 发送心跳请求<br/>{HostInfo, EnvInfo, Timestamp, TOTPCode=""}
Watchdog->>Watchdog: 3. 验证时间戳有效性<br/>|now - timestamp| < 5分钟
Watchdog->>Watchdog: 4. 添加主机到集合<br/>AddHostInfo()
Watchdog-->>Agent: 5. 返回响应<br/>{Authorized:false, TierTwoSecret:secret}
Agent->>Agent: 6. 保存TOTP密钥<br/>tierTwoTotpSecret = secret
rect rgb(220, 255, 220)
Note over Agent,Business: ===== 后续心跳 - 授权验证 =====
end
loop 心跳循环 (成功后2小时,失败后1小时)
Agent->>Agent: 7. 生成TOTP验证码<br/>GenerateTierTwoTOTPCode(secret)<br/>6位,30秒有效
Agent->>Watchdog: 8. 发送心跳请求<br/>{HostInfo, Timestamp, TOTPCode}
Watchdog->>Watchdog: 9. 验证TOTP验证码<br/>VerifyTierTwoTOTPCode()
alt TOTP验证成功
Watchdog->>Watchdog: 10. 检查主机授权状态<br/>IsHostAuthorized(hostInfo)
Watchdog->>Watchdog: 11. 生成响应TOTP
Watchdog-->>Agent: 12. 返回{Authorized:true/false, TOTPCode}
Agent->>Agent: 13. 验证服务端TOTP<br/>双向验证
alt 授权成功
Agent->>Agent: 14. failCount = 1<br/>等待2小时
else 授权失败
Agent->>Agent: 15. failCount++<br/>等待1小时
end
else TOTP验证失败
Watchdog-->>Agent: 返回错误:无效的TOTP验证码
Agent->>Agent: failCount++
end
alt failCount >= 12
Agent->>Business: 16. 发送SIGTERM信号
Note over Business: 业务进程终止<br/>(死手系统触发)
end
end
2.4 授权撤销流程
sequenceDiagram
autonumber
participant Admin as 管理员<br/>(RMDC Portal)
participant PM as project-management
participant Center as rmdc-watchdog-center<br/>(一级授权中心)
participant MQTT as Exchange-Hub<br/>(MQTT)
participant Watchdog as rmdc-watchdog<br/>(二级授权中心)
participant Agent as rmdc-watchdog-agent
participant Business as 业务进程
rect rgb(255, 220, 220)
Note over Admin,Business: ===== 授权撤销流程 =====
end
Admin->>PM: 1. 发起撤销授权请求<br/>{project_id, reason}
PM->>Center: 2. 请求撤销项目授权
Center->>Center: 3. 更新项目授权状态<br/>status = revoked
Center->>Center: 4. 生成撤销Command<br/>{type:"auth_revoke", project_id, totp}
Center->>MQTT: 5. 发布撤销Command到<br/>wdd/RDMC/command/down/{project_id}
MQTT->>Watchdog: 6. 推送撤销指令
Watchdog->>Watchdog: 7. 验证撤销指令TOTP
Watchdog->>Watchdog: 8. 清除本地授权存储<br/>deleteAuthorizationInfo()
Watchdog->>Watchdog: 9. 设置授权状态为未授权<br/>initialized = false
Note over Agent: 下次心跳时...
Agent->>Watchdog: 10. 发送心跳请求
Watchdog-->>Agent: 11. 返回{Authorized:false}
Agent->>Agent: 12. failCount++
Note over Agent: 连续失败12次后...
Agent->>Business: 13. 发送SIGTERM信号<br/>触发业务进程终止
Note over Business: 业务自毁<br/>死手系统生效
Watchdog->>MQTT: 14. 上报撤销完成消息<br/>{type:"auth_revoke_ack"}
MQTT->>Center: 15. 转发撤销确认
Center->>PM: 16. 更新项目状态
PM->>Admin: 17. 通知撤销成功
三、K8S指令执行流程
3.1 K8S指令业务流程图
sequenceDiagram
autonumber
participant User as 用户<br/>(RMDC Portal)
participant Api as rmdc-core<br/>(API Gateway)
participant Operator as octopus-operator<br/>(执行中心)
participant ExHub as rmdc-exchange-hub<br/>(消息网关)
participant MQTT as MQTT Broker
participant WD as rmdc-watchdog<br/>(K8s Operator)
participant K8s as Kubernetes API
rect rgb(200, 220, 255)
Note over User,K8s: ===== 阶段1: 指令发起 =====
end
User->>Api: 1. 发起K8S操作请求<br/>{project_id, action, resource, name}
Api->>Api: 2. 验证用户权限<br/>CheckPermission(user, project, action)
Api->>Operator: 3. 调用执行中心API
Operator->>Operator: 4. 构造K8S执行指令<br/>K8sExecCommand{<br/> command_id, namespace,<br/> resource, name, action,<br/> command, timeout<br/>}
Operator->>ExHub: 5. 调用指令下发API<br/>POST /api/command/send
ExHub->>ExHub: 6. 生成唯一CommandID<br/>记录指令到数据库<br/>状态: Pending
ExHub->>MQTT: 7. 发布K8S执行Command到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"k8s_exec", payload}
rect rgb(220, 255, 220)
Note over User,K8s: ===== 阶段2: 指令执行 =====
end
MQTT->>WD: 8. 推送K8S执行指令
WD->>WD: 9. 解析指令<br/>路由到K8sHandler
WD->>WD: 10. 记录开始时间<br/>status = running
alt action == "logs"
WD->>K8s: 11a. 调用K8S API<br/>GetPodLogs(namespace, name, container)
else action == "exec"
WD->>K8s: 11b. 调用K8S API<br/>ExecCommand(pod, container, command)
else action == "scale"
WD->>K8s: 11c. 调用K8S API<br/>ScaleDeployment(name, replicas)
else action == "restart"
WD->>K8s: 11d. 调用K8S API<br/>RolloutRestart(deployment)
else action == "delete"
WD->>K8s: 11e. 调用K8S API<br/>DeleteResource(resource, name)
end
K8s-->>WD: 12. 返回执行结果
rect rgb(255, 240, 220)
Note over User,K8s: ===== 阶段3: 结果返回 =====
end
WD->>WD: 13. 构造执行结果<br/>ExecResult{command_id, status,<br/>exit_code, output, error, duration}
WD->>MQTT: 14. 发布结果Message到<br/>wdd/RDMC/message/up<br/>{type:"exec_result", payload}
MQTT->>ExHub: 15. 转发执行结果
ExHub->>ExHub: 16. 更新指令状态<br/>记录执行时长
ExHub->>Operator: 17. 推送结果给执行中心
Operator->>Api: 18. 返回执行结果
Api->>User: 19. 展示执行结果
3.2 K8S支持的操作类型
| Action | 说明 | 目标资源 | 参数 |
|---|---|---|---|
logs |
获取日志 | Pod | container, tail_lines, follow |
exec |
执行命令 | Pod | container, command[], timeout |
scale |
扩缩容 | Deployment/StatefulSet | scale_count |
restart |
滚动重启 | Deployment/StatefulSet | - |
delete |
删除资源 | Pod/Deployment/Service等 | - |
get |
获取信息 | 任意资源 | output_format |
apply |
应用配置 | 任意资源 | yaml_content |
四、主机指令执行流程
4.1 主机指令业务流程图
sequenceDiagram
autonumber
participant User as 用户<br/>(RMDC Portal)
participant Api as rmdc-core<br/>(API Gateway)
participant Operator as octopus-operator<br/>(执行中心)
participant ExHub as rmdc-exchange-hub<br/>(消息网关)
participant MQTT as MQTT Broker
participant WD as rmdc-watchdog<br/>(二级授权中心)
participant Node as watchdog-node<br/>(主机守护)
rect rgb(200, 220, 255)
Note over User,Node: ===== 阶段1: 指令发起 =====
end
User->>Api: 1. 发起主机操作请求<br/>{project_id, host_id, action, script}
Api->>Api: 2. 验证用户权限<br/>CheckPermission(user, project, "host_exec")
Api->>Operator: 3. 调用执行中心API
Operator->>Operator: 4. 构造主机执行指令<br/>HostExecCommand{<br/> command_id, host_id,<br/> action, script, args, timeout<br/>}
Operator->>ExHub: 5. 调用指令下发API<br/>POST /api/command/send
ExHub->>ExHub: 6. 生成唯一CommandID<br/>记录指令到数据库<br/>状态: Pending
ExHub->>MQTT: 7. 发布主机执行Command到<br/>wdd/RDMC/command/down/{project_id}<br/>{type:"host_exec", payload}
rect rgb(220, 255, 220)
Note over User,Node: ===== 阶段2: 指令转发与执行 =====
end
MQTT->>WD: 8. 推送主机执行指令
WD->>WD: 9. 解析指令<br/>路由到HostHandler
WD->>WD: 10. 验证目标主机在线<br/>CheckHostOnline(host_id)
alt 主机在线
WD->>Node: 11. 转发执行指令到目标Node<br/>HTTP POST /api/exec<br/>(内网通信,TOTP验证)
Node->>Node: 12. 验证请求合法性<br/>VerifyTOTP()
Node->>Node: 13. 执行命令/脚本<br/>ExecuteScript(script, args)
Node-->>WD: 14. 返回执行结果<br/>{exit_code, stdout, stderr}
else 主机离线
WD->>WD: 构造错误结果<br/>error = "目标主机离线"
end
rect rgb(255, 240, 220)
Note over User,Node: ===== 阶段3: 结果返回 =====
end
WD->>WD: 15. 构造执行结果<br/>ExecResult{command_id, status,<br/>exit_code, output, error, duration}
WD->>MQTT: 16. 发布结果Message到<br/>wdd/RDMC/message/up<br/>{type:"exec_result", payload}
MQTT->>ExHub: 17. 转发执行结果
ExHub->>ExHub: 18. 更新指令状态<br/>记录执行时长
ExHub->>Operator: 19. 推送结果给执行中心
Operator->>Api: 20. 返回执行结果
Api->>User: 21. 展示执行结果
4.2 Watchdog到Node的通信架构
graph TB
subgraph "rmdc-watchdog"
WD_Router[消息路由器]
WD_HostH[HostHandler]
WD_NodeClient[NodeClient<br/>HTTP Client]
end
subgraph "watchdog-node (DaemonSet)"
Node_Server[HTTP Server<br/>:8081]
Node_Auth[TOTP验证中间件]
Node_Exec[命令执行器]
Node_Info[信息收集器]
end
WD_Router --> WD_HostH
WD_HostH --> WD_NodeClient
WD_NodeClient ==HTTP/TOTP==> Node_Server
Node_Server --> Node_Auth
Node_Auth --> Node_Exec
Node_Auth --> Node_Info
style WD_NodeClient fill:#4ecdc4,stroke:#087f5b
style Node_Server fill:#a9e34b,stroke:#5c940d
4.3 主机支持的操作类型
| Action | 说明 | 参数 |
|---|---|---|
exec |
执行Shell命令 | script, args[], timeout |
info |
获取主机信息 | info_type (cpu/memory/disk/network) |
service |
服务管理 | service_name, operation (start/stop/restart) |
file |
文件操作 | path, operation (read/write/delete) |
dltu |
镜像操作 | operation (download/load/tag/upload), params |
五、同步指令设计方案
5.1 同步指令可行性分析
适用场景分析
| 场景 | 当前模式 | 同步需求 | 实现建议 |
|---|---|---|---|
| 日志查看 | 异步 | 高 - 需要实时看到日志输出 | 长轮询或推送 |
| 主机命令执行 | 异步 | 中 - 交互式命令需要实时 | 超时等待机制 |
| K8S资源查询 | 异步 | 中 - 查询操作期望快速返回 | 超时等待机制 |
| 业务更新 | 异步 | 低 - 长时间操作适合异步 | 保持异步 + 通知 |
| 监控数据 | 异步 | 低 - 定期上报即可 | 保持异步 |
技术挑战
- 网络延迟: 跨公网MQTT通信存在不确定延迟
- 超时处理: 需要合理设置超时时间避免长时间阻塞
- 连接保持: 长时间等待需要保持连接不断开
- 资源占用: 同步等待会占用服务器连接资源
5.2 基于现有架构的同步指令实现方案
方案设计 (基于Go Channel + HTTP长轮询)
根据项目现有的Go技术栈,建议采用HTTP长轮询 + 结果缓存方案:
sequenceDiagram
autonumber
participant User as 用户
participant Api as API Gateway
participant ExHub as Exchange-Hub
participant Cache as ResultCache<br/>(sync.Map)
participant MQTT as MQTT Broker
participant WD as Watchdog
rect rgb(200, 220, 255)
Note over User,WD: ===== 同步指令发送 =====
end
User->>Api: 1. POST /api/command/sync<br/>{project_id, type, payload, timeout:30s}
Api->>ExHub: 2. 调用同步指令API
ExHub->>ExHub: 3. 生成CommandID
ExHub->>Cache: 4. 创建等待通道<br/>waitChan[cmdId] = make(chan Result)
ExHub->>MQTT: 5. 发布指令
MQTT->>WD: 6. 推送到Watchdog
rect rgb(220, 255, 220)
Note over User,WD: ===== 执行与等待 =====
end
WD->>WD: 7. 执行指令
WD->>MQTT: 8. 返回结果
MQTT->>ExHub: 9. 接收结果
ExHub->>Cache: 10. 发送结果到通道<br/>waitChan[cmdId] <- result
rect rgb(255, 240, 220)
Note over User,WD: ===== 结果返回 =====
end
ExHub->>ExHub: 11. select等待结果或超时
ExHub->>Api: 12. 返回执行结果
Api->>User: 13. 返回结果给用户
Exchange-Hub同步指令管理器设计
// SyncCommandManager 同步指令管理器
type SyncCommandManager struct {
waitChannels sync.Map // map[commandID]chan *ExecResult
timeout time.Duration
}
// SendAndWait 发送指令并等待结果
func (m *SyncCommandManager) SendAndWait(projectID string, cmd *CommandMessage, timeout time.Duration) (*ExecResult, error) {
// 1. 创建等待通道
waitChan := make(chan *ExecResult, 1)
m.waitChannels.Store(cmd.MessageID, waitChan)
defer m.waitChannels.Delete(cmd.MessageID)
// 2. 发送指令
mqttService := GetMQTTService()
if err := mqttService.PublishCommand(projectID, cmd); err != nil {
return nil, err
}
// 3. 等待结果或超时
select {
case result := <-waitChan:
return result, nil
case <-time.After(timeout):
return nil, errors.New("command execution timeout")
}
}
// OnResult 接收结果回调
func (m *SyncCommandManager) OnResult(result *ExecResult) {
if ch, ok := m.waitChannels.Load(result.CommandID); ok {
ch.(chan *ExecResult) <- result
}
}
5.3 日志实时查看方案
对于日志实时查看场景,建议采用流式日志推送方案:
sequenceDiagram
autonumber
participant User as 用户
participant Frontend as 前端
participant Api as API Gateway
participant ExHub as Exchange-Hub
participant MQTT as MQTT Broker
participant WD as Watchdog
participant K8s as K8S API
User->>Frontend: 1. 点击查看实时日志
Frontend->>Api: 2. POST /api/logs/stream<br/>{project_id, pod, container, follow:true}
Api->>ExHub: 3. 创建日志流会话<br/>sessionId = uuid()
ExHub->>MQTT: 4. 发布日志查询指令<br/>{type:"log_query", follow:true, session_id}
MQTT->>WD: 5. 推送指令
WD->>K8s: 6. 调用K8S Logs API (follow=true)
loop 日志流
K8s-->>WD: 7. 返回日志行
WD->>MQTT: 8. 发布日志Message<br/>{type:"log_result", session_id, lines, is_complete:false}
MQTT->>ExHub: 9. 转发日志
ExHub->>ExHub: 10. 根据session_id路由
ExHub->>Api: 11. 推送到API会话
Api->>Frontend: 12. SSE/轮询返回日志
Frontend->>User: 13. 实时显示日志
end
User->>Frontend: 14. 停止查看
Frontend->>Api: 15. DELETE /api/logs/stream/{session_id}
Api->>ExHub: 16. 关闭日志流会话
ExHub->>MQTT: 17. 发布停止指令
前端实现建议
// 使用Server-Sent Events (SSE) 接收实时日志
async function streamLogs(projectId: string, pod: string, container: string) {
const response = await fetch(`/api/logs/stream`, {
method: 'POST',
body: JSON.stringify({ project_id: projectId, pod, container, follow: true })
});
const sessionId = (await response.json()).session_id;
// 使用SSE或长轮询接收日志
const eventSource = new EventSource(`/api/logs/stream/${sessionId}`);
eventSource.onmessage = (event) => {
const logData = JSON.parse(event.data);
appendLogToUI(logData.lines);
};
return {
stop: () => {
eventSource.close();
fetch(`/api/logs/stream/${sessionId}`, { method: 'DELETE' });
}
};
}
六、数据结构与安全机制汇总
6.1 MQTT Topic与消息类型对照表
| 方向 | Topic | 消息类型 | 说明 |
|---|---|---|---|
| 上行 | wdd/RDMC/command/up |
register |
项目注册 |
| 上行 | wdd/RDMC/command/up |
auth_request |
授权申请 |
| 上行 | wdd/RDMC/message/up |
register_complete |
注册完成确认 |
| 上行 | wdd/RDMC/message/up |
heartbeat |
心跳数据 |
| 上行 | wdd/RDMC/message/up |
monitor |
监控数据上报 |
| 上行 | wdd/RDMC/message/up |
exec_result |
指令执行结果 |
| 上行 | wdd/RDMC/message/up |
log_result |
日志查询结果 |
| 上行 | wdd/RDMC/message/up |
alert |
告警信息 |
| 下行 | wdd/RDMC/command/down/{id} |
auth_response |
授权响应 |
| 下行 | wdd/RDMC/command/down/{id} |
auth_revoke |
授权撤销 |
| 下行 | wdd/RDMC/command/down/{id} |
log_query |
日志查询指令 |
| 下行 | wdd/RDMC/command/down/{id} |
host_exec |
主机执行指令 |
| 下行 | wdd/RDMC/command/down/{id} |
k8s_exec |
K8S执行指令 |
| 下行 | wdd/RDMC/command/down/{id} |
update |
业务更新指令 |
| 下行 | wdd/RDMC/message/down/{id} |
register_ack |
注册确认消息 |
6.2 安全机制汇总
| 场景 | 安全机制 | 参数 |
|---|---|---|
| Center ↔ Watchdog | Tier-One TOTP + AES-GCM | 8位码, 30分钟有效期, SHA256 |
| Watchdog ↔ Agent | Tier-Two TOTP | 6位码, 30秒有效期, SHA1 |
| Watchdog ↔ Node | Tier-Two TOTP复用 | 内网HTTP + TOTP认证 |
| HTTP备用接口 | 复用Tier-Two TOTP密钥 | 需要TOTP认证 |
| 主机信息 | 硬件指纹绑定 | MachineID+CPU+Memory+Serial |
| 死手系统 | 心跳失败自毁 | 连续12次失败触发 |
| 消息传输 | TLS加密 | MQTT over TLS |
| 敏感数据 | AES-256-GCM加密 | 授权码、密钥等 |