# Watchdog模块开发提示词 > 本文档为大模型提供rmdc-watchdog模块的开发上下文。 --- ## 模块概述 **rmdc-watchdog** 是部署在外部项目环境的边缘代理模块,负责: 1. **二级授权中心**: 管理项目内业务和主机的授权 2. **K8S操作代理**: 执行K8S API操作(CRUD资源、日志获取等) 3. **指令接收与执行**: 接收Exchange-Hub下发的指令并执行 4. **监控数据上报**: 收集并上报主机/业务运行状态 --- ## 组件架构 ```mermaid graph TB subgraph "rmdc-watchdog 组件" MQTT[MQTT Client] Router[消息路由器] subgraph "Handler" AuthH[授权Handler] K8sH[K8S Handler] HostH[主机Handler] LogH[日志Handler] end subgraph "Service" AuthS[授权服务] K8sS[K8S服务] NodeS[Node通信服务] end K8sClient[K8S Client] NodeClient[Node HTTP Client] end subgraph "watchdog-node (DaemonSet)" NodeServer[HTTP Server :8081] NodeInfo[信息采集] NodeExec[命令执行] end subgraph "watchdog-agent (业务启动器)" AgentHB[心跳模块] AgentBiz[业务进程管理] end MQTT --> Router Router --> AuthH & K8sH & HostH & LogH AuthH --> AuthS K8sH --> K8sS --> K8sClient HostH --> NodeS --> NodeClient --> NodeServer NodeServer --> NodeInfo & NodeExec AgentHB <--> AuthS ``` --- ## 关联组件说明 | 组件 | 运行位置 | 职责 | |------|----------|------| | **rmdc-watchdog** | K8S Deployment | 二级授权中心、K8S操作、指令调度 | | **rmdc-watchdog-node** | K8S DaemonSet | 主机信息采集、主机命令执行 | | **rmdc-watchdog-agent** | 业务Pod Sidecar | 业务进程管理、授权心跳、死手系统 | --- ## 项目结构 ``` rmdc-watchdog/ ├── cmd/ │ └── main.go ├── configs/ │ └── config.yaml ├── internal/ │ ├── config/ │ │ └── config.go │ ├── dao/ │ │ ├── auth_dao.go │ │ └── host_dao.go │ ├── handler/ │ │ ├── router.go │ │ ├── auth_handler.go │ │ ├── heartbeat_handler.go │ │ ├── k8s_handler.go │ │ └── node_handler.go │ ├── model/ │ │ ├── dto/ │ │ │ └── watchdog_dto.go │ │ └── entity/ │ │ └── auth_info.go │ └── service/ │ ├── mqtt_service.go │ ├── auth_service.go │ ├── k8s_service.go │ ├── node_service.go │ └── message_router.go └── pkg/ ├── k8s/ │ └── client.go # K8S API封装 └── totp/ └── totp.go # TOTP工具 ``` --- ## 核心数据结构 ### 心跳请求/响应 ```go // HeartbeatRequest Agent心跳请求 type HeartbeatRequest struct { HostInfo common.HostInfo `json:"host_info"` EnvInfo common.EnvInfo `json:"env_info"` Timestamp int64 `json:"timestamp"` TOTPCode string `json:"totp_code"` } // HeartbeatResponse 心跳响应 type HeartbeatResponse struct { Authorized bool `json:"authorized"` TOTPCode string `json:"totp_code"` // 服务端TOTP(双向验证) Timestamp int64 `json:"timestamp"` SecondTOTPSecret string `json:"second_totp_secret,omitempty"` // 首次连接返回 } ``` ### K8S执行指令 ```go // K8sExecCommand K8S执行指令 type K8sExecCommand struct { CommandID string `json:"command_id"` Namespace string `json:"namespace"` Resource string `json:"resource"` // deployment, pod, statefulset... Name string `json:"name"` Action string `json:"action"` // logs, exec, scale, restart, delete, get, apply Container string `json:"container,omitempty"` Command []string `json:"command,omitempty"` Timeout int `json:"timeout"` TailLines int `json:"tail_lines,omitempty"` FollowLogs bool `json:"follow_logs,omitempty"` } // ExecResult 执行结果 type ExecResult struct { CommandID string `json:"command_id"` Status string `json:"status"` // success, failure, timeout ExitCode int `json:"exit_code"` Output string `json:"output"` Error string `json:"error"` StartTime int64 `json:"start_time"` EndTime int64 `json:"end_time"` Duration int64 `json:"duration"` } ``` --- ## TOTP双层授权机制 ### 层级说明 ``` 一级授权(Tier-One): project-management ↔ watchdog - 8位验证码 - 30分钟有效期 - SHA256算法 二级授权(Tier-Two): watchdog ↔ agent/node - 6位验证码 - 30秒有效期 - SHA1算法 ``` ### 授权流程 ```go // 验证Agent心跳 func (s *AuthService) VerifyHeartbeat(req *HeartbeatRequest) (*HeartbeatResponse, error) { // 1. 验证时间戳 if abs(time.Now().Unix() - req.Timestamp/1000) > 300 { return nil, errors.New("timestamp expired") } // 2. 首次连接(无TOTP码) if req.TOTPCode == "" { s.addHost(req.HostInfo) return &HeartbeatResponse{ Authorized: false, SecondTOTPSecret: s.tierTwoSecret, Timestamp: time.Now().UnixMilli(), }, nil } // 3. 验证TOTP if !s.verifyTierTwoTOTP(req.TOTPCode) { return nil, errors.New("invalid totp code") } // 4. 检查主机授权状态 authorized := s.isHostAuthorized(req.HostInfo) // 5. 生成响应TOTP(双向验证) responseTOTP := s.generateTierTwoTOTP() return &HeartbeatResponse{ Authorized: authorized, TOTPCode: responseTOTP, Timestamp: time.Now().UnixMilli(), }, nil } ``` --- ## K8S操作封装 ### 支持的操作 | Action | 说明 | 目标资源 | |--------|------|----------| | `logs` | 获取日志 | Pod | | `exec` | 执行命令 | Pod | | `scale` | 扩缩容 | Deployment/StatefulSet | | `restart` | 滚动重启 | Deployment/StatefulSet | | `delete` | 删除资源 | 任意资源 | | `get` | 获取信息 | 任意资源 | | `apply` | 应用YAML | 任意资源 | ### K8S客户端使用 ```go func (s *K8sService) ExecuteCommand(cmd *K8sExecCommand) (*ExecResult, error) { startTime := time.Now() result := &ExecResult{ CommandID: cmd.CommandID, StartTime: startTime.UnixMilli(), } var output string var err error switch cmd.Action { case "logs": output, err = s.k8sClient.GetPodLogs(cmd.Namespace, cmd.Name, cmd.Container, cmd.TailLines) case "exec": output, err = s.k8sClient.ExecCommand(cmd.Namespace, cmd.Name, cmd.Container, cmd.Command) case "scale": output, err = s.k8sClient.Scale(cmd.Namespace, cmd.Resource, cmd.Name, cmd.Scale) case "restart": output, err = s.k8sClient.RolloutRestart(cmd.Namespace, cmd.Resource, cmd.Name) case "delete": output, err = s.k8sClient.Delete(cmd.Namespace, cmd.Resource, cmd.Name) } result.EndTime = time.Now().UnixMilli() result.Duration = result.EndTime - result.StartTime if err != nil { result.Status = "failure" result.Error = err.Error() } else { result.Status = "success" result.Output = output } return result, nil } ``` --- ## Node通信 ### Node HTTP接口 | 接口 | 方法 | 说明 | |------|------|------| | `/api/exec` | POST | 执行命令 | | `/api/info` | GET | 获取主机信息 | | `/api/metrics` | GET | 获取运行指标 | | `/api/dltu` | POST | 镜像操作(Download-Load-Tag-Upload) | ### 与Node通信 ```go func (s *NodeService) ExecuteOnNode(nodeID string, script string, args []string) (*ExecResult, error) { // 1. 获取Node地址 node := s.getNode(nodeID) if node == nil { return nil, errors.New("node not found") } // 2. 生成TOTP totp := s.generateTierTwoTOTP() // 3. 发送请求 req := &NodeExecRequest{ Script: script, Args: args, Timestamp: time.Now().UnixMilli(), TOTPCode: totp, } resp, err := s.httpClient.Post(fmt.Sprintf("http://%s:8081/api/exec", node.IP), req) if err != nil { return nil, err } // 4. 验证响应TOTP if !s.verifyTierTwoTOTP(resp.TOTPCode) { return nil, errors.New("invalid response totp") } return resp.Result, nil } ``` --- ## 死手系统 Agent内置死手系统,保护业务授权安全: | 参数 | 值 | 说明 | |------|------|------| | `maxRetryCount` | 12 | 最大失败次数 | | `successInterval` | 2小时 | 成功后心跳间隔 | | `failInterval` | 1小时 | 失败后心跳间隔 | ```go // Agent心跳循环 func (a *Agent) heartbeatLoop() { failCount := 0 for { resp, err := a.sendHeartbeat() if err != nil || !resp.Authorized { failCount++ if failCount >= 12 { // 触发死手系统 a.killBusiness() return } time.Sleep(1 * time.Hour) } else { failCount = 1 time.Sleep(2 * time.Hour) } } } func (a *Agent) killBusiness() { // 发送SIGTERM信号终止业务进程 a.businessProcess.Signal(syscall.SIGTERM) } ``` --- ## MQTT订阅与发布 ### 启动时订阅 ```go func (s *MQTTService) Start() { // 订阅下行Topic s.client.Subscribe(fmt.Sprintf("wdd/RDMC/command/down/%s", s.projectID), s.onCommand) s.client.Subscribe(fmt.Sprintf("wdd/RDMC/message/down/%s", s.projectID), s.onMessage) } ``` ### 发布上行消息 ```go func (s *MQTTService) PublishResult(result *ExecResult) error { msg := &common.DataMessage{ BaseMessage: common.BaseMessage{ MessageID: uuid.New().String(), Type: "message", ProjectID: s.projectID, Timestamp: time.Now().UnixMilli(), }, DataType: "exec_result", Payload: result, } return s.client.Publish("wdd/RDMC/message/up", msg) } ``` --- ## 配置项 ```yaml # configs/config.yaml server: host: 0.0.0.0 port: 8990 mqtt: broker: tcp://mqtt-broker:1883 client_id: watchdog-${PROJECT_ID} username: ${MQTT_USERNAME} password: ${MQTT_PASSWORD} project: id: ${PROJECT_ID} namespace: ${NAMESPACE} auth: tier_one_secret: ${TIER_ONE_SECRET} tier_two_secret: ${TIER_TWO_SECRET} time_offset_allowed: 300 totp_verification_enabled: true k8s: kubeconfig_path: "" # 空则使用InCluster配置 ``` --- ## 常见开发任务 ### 1. 添加新的K8S操作 1. 在`pkg/k8s/client.go`添加K8S API调用方法 2. 在`internal/service/k8s_service.go`的switch中添加case 3. 更新`K8sExecCommand`结构(如需新参数) ### 2. 添加新的指令类型 1. 在`message_router.go`添加路由分支 2. 创建对应的Handler和Service 3. 同步更新Exchange-Hub端的指令下发 ### 3. 修改心跳逻辑 1. 修改`auth_service.go`的`VerifyHeartbeat`方法 2. 同步修改Agent端的心跳发送逻辑 3. 如需新增字段,更新DTO结构 --- ## 相关文档 | 文档 | 内容 | |------|------| | `1-rmdc-watchdog-DDS.md` | 详细设计 | | `2-rmdc-watchdog-业务流程图.md` | 业务流程图 | | `3-rmdc-watchdog-内部交互流程.md` | 内部交互流程 |