11 KiB
11 KiB
Watchdog模块开发提示词
本文档为大模型提供rmdc-watchdog模块的开发上下文。
模块概述
rmdc-watchdog 是部署在外部项目环境的边缘代理模块,负责:
- 二级授权中心: 管理项目内业务和主机的授权
- K8S操作代理: 执行K8S API操作(CRUD资源、日志获取等)
- 指令接收与执行: 接收Exchange-Hub下发的指令并执行
- 监控数据上报: 收集并上报主机/业务运行状态
组件架构
graph TB
subgraph "rmdc-watchdog 组件"
MQTT[MQTT Client]
Router[消息路由器]
subgraph "Handler"
AuthH[授权Handler]
K8sH[K8S Handler]
HostH[主机Handler]
LogH[日志Handler]
end
subgraph "Service"
AuthS[授权服务]
K8sS[K8S服务]
NodeS[Node通信服务]
end
K8sClient[K8S Client]
NodeClient[Node HTTP Client]
end
subgraph "watchdog-node (DaemonSet)"
NodeServer[HTTP Server :8081]
NodeInfo[信息采集]
NodeExec[命令执行]
end
subgraph "watchdog-agent (业务启动器)"
AgentHB[心跳模块]
AgentBiz[业务进程管理]
end
MQTT --> Router
Router --> AuthH & K8sH & HostH & LogH
AuthH --> AuthS
K8sH --> K8sS --> K8sClient
HostH --> NodeS --> NodeClient --> NodeServer
NodeServer --> NodeInfo & NodeExec
AgentHB <--> AuthS
关联组件说明
| 组件 | 运行位置 | 职责 |
|---|---|---|
| rmdc-watchdog | K8S Deployment | 二级授权中心、K8S操作、指令调度 |
| rmdc-watchdog-node | K8S DaemonSet | 主机信息采集、主机命令执行 |
| rmdc-watchdog-agent | 业务Pod Sidecar | 业务进程管理、授权心跳、死手系统 |
项目结构
rmdc-watchdog/
├── cmd/
│ └── main.go
├── configs/
│ └── config.yaml
├── internal/
│ ├── config/
│ │ └── config.go
│ ├── dao/
│ │ ├── auth_dao.go
│ │ └── host_dao.go
│ ├── handler/
│ │ ├── router.go
│ │ ├── auth_handler.go
│ │ ├── heartbeat_handler.go
│ │ ├── k8s_handler.go
│ │ └── node_handler.go
│ ├── model/
│ │ ├── dto/
│ │ │ └── watchdog_dto.go
│ │ └── entity/
│ │ └── auth_info.go
│ └── service/
│ ├── mqtt_service.go
│ ├── auth_service.go
│ ├── k8s_service.go
│ ├── node_service.go
│ └── message_router.go
└── pkg/
├── k8s/
│ └── client.go # K8S API封装
└── totp/
└── totp.go # TOTP工具
核心数据结构
心跳请求/响应
// HeartbeatRequest Agent心跳请求
type HeartbeatRequest struct {
HostInfo common.HostInfo `json:"host_info"`
EnvInfo common.EnvInfo `json:"env_info"`
Timestamp int64 `json:"timestamp"`
TOTPCode string `json:"totp_code"`
}
// HeartbeatResponse 心跳响应
type HeartbeatResponse struct {
Authorized bool `json:"authorized"`
TOTPCode string `json:"totp_code"` // 服务端TOTP(双向验证)
Timestamp int64 `json:"timestamp"`
SecondTOTPSecret string `json:"second_totp_secret,omitempty"` // 首次连接返回
}
K8S执行指令
// K8sExecCommand K8S执行指令
type K8sExecCommand struct {
CommandID string `json:"command_id"`
Namespace string `json:"namespace"`
Resource string `json:"resource"` // deployment, pod, statefulset...
Name string `json:"name"`
Action string `json:"action"` // logs, exec, scale, restart, delete, get, apply
Container string `json:"container,omitempty"`
Command []string `json:"command,omitempty"`
Timeout int `json:"timeout"`
TailLines int `json:"tail_lines,omitempty"`
FollowLogs bool `json:"follow_logs,omitempty"`
}
// ExecResult 执行结果
type ExecResult struct {
CommandID string `json:"command_id"`
Status string `json:"status"` // success, failure, timeout
ExitCode int `json:"exit_code"`
Output string `json:"output"`
Error string `json:"error"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Duration int64 `json:"duration"`
}
TOTP双层授权机制
层级说明
一级授权(Tier-One): project-management ↔ watchdog
- 8位验证码
- 30分钟有效期
- SHA256算法
二级授权(Tier-Two): watchdog ↔ agent/node
- 6位验证码
- 30秒有效期
- SHA1算法
授权流程
// 验证Agent心跳
func (s *AuthService) VerifyHeartbeat(req *HeartbeatRequest) (*HeartbeatResponse, error) {
// 1. 验证时间戳
if abs(time.Now().Unix() - req.Timestamp/1000) > 300 {
return nil, errors.New("timestamp expired")
}
// 2. 首次连接(无TOTP码)
if req.TOTPCode == "" {
s.addHost(req.HostInfo)
return &HeartbeatResponse{
Authorized: false,
SecondTOTPSecret: s.tierTwoSecret,
Timestamp: time.Now().UnixMilli(),
}, nil
}
// 3. 验证TOTP
if !s.verifyTierTwoTOTP(req.TOTPCode) {
return nil, errors.New("invalid totp code")
}
// 4. 检查主机授权状态
authorized := s.isHostAuthorized(req.HostInfo)
// 5. 生成响应TOTP(双向验证)
responseTOTP := s.generateTierTwoTOTP()
return &HeartbeatResponse{
Authorized: authorized,
TOTPCode: responseTOTP,
Timestamp: time.Now().UnixMilli(),
}, nil
}
K8S操作封装
支持的操作
| Action | 说明 | 目标资源 |
|---|---|---|
logs |
获取日志 | Pod |
exec |
执行命令 | Pod |
scale |
扩缩容 | Deployment/StatefulSet |
restart |
滚动重启 | Deployment/StatefulSet |
delete |
删除资源 | 任意资源 |
get |
获取信息 | 任意资源 |
apply |
应用YAML | 任意资源 |
K8S客户端使用
func (s *K8sService) ExecuteCommand(cmd *K8sExecCommand) (*ExecResult, error) {
startTime := time.Now()
result := &ExecResult{
CommandID: cmd.CommandID,
StartTime: startTime.UnixMilli(),
}
var output string
var err error
switch cmd.Action {
case "logs":
output, err = s.k8sClient.GetPodLogs(cmd.Namespace, cmd.Name, cmd.Container, cmd.TailLines)
case "exec":
output, err = s.k8sClient.ExecCommand(cmd.Namespace, cmd.Name, cmd.Container, cmd.Command)
case "scale":
output, err = s.k8sClient.Scale(cmd.Namespace, cmd.Resource, cmd.Name, cmd.Scale)
case "restart":
output, err = s.k8sClient.RolloutRestart(cmd.Namespace, cmd.Resource, cmd.Name)
case "delete":
output, err = s.k8sClient.Delete(cmd.Namespace, cmd.Resource, cmd.Name)
}
result.EndTime = time.Now().UnixMilli()
result.Duration = result.EndTime - result.StartTime
if err != nil {
result.Status = "failure"
result.Error = err.Error()
} else {
result.Status = "success"
result.Output = output
}
return result, nil
}
Node通信
Node HTTP接口
| 接口 | 方法 | 说明 |
|---|---|---|
/api/exec |
POST | 执行命令 |
/api/info |
GET | 获取主机信息 |
/api/metrics |
GET | 获取运行指标 |
/api/dltu |
POST | 镜像操作(Download-Load-Tag-Upload) |
与Node通信
func (s *NodeService) ExecuteOnNode(nodeID string, script string, args []string) (*ExecResult, error) {
// 1. 获取Node地址
node := s.getNode(nodeID)
if node == nil {
return nil, errors.New("node not found")
}
// 2. 生成TOTP
totp := s.generateTierTwoTOTP()
// 3. 发送请求
req := &NodeExecRequest{
Script: script,
Args: args,
Timestamp: time.Now().UnixMilli(),
TOTPCode: totp,
}
resp, err := s.httpClient.Post(fmt.Sprintf("http://%s:8081/api/exec", node.IP), req)
if err != nil {
return nil, err
}
// 4. 验证响应TOTP
if !s.verifyTierTwoTOTP(resp.TOTPCode) {
return nil, errors.New("invalid response totp")
}
return resp.Result, nil
}
死手系统
Agent内置死手系统,保护业务授权安全:
| 参数 | 值 | 说明 |
|---|---|---|
maxRetryCount |
12 | 最大失败次数 |
successInterval |
2小时 | 成功后心跳间隔 |
failInterval |
1小时 | 失败后心跳间隔 |
// Agent心跳循环
func (a *Agent) heartbeatLoop() {
failCount := 0
for {
resp, err := a.sendHeartbeat()
if err != nil || !resp.Authorized {
failCount++
if failCount >= 12 {
// 触发死手系统
a.killBusiness()
return
}
time.Sleep(1 * time.Hour)
} else {
failCount = 1
time.Sleep(2 * time.Hour)
}
}
}
func (a *Agent) killBusiness() {
// 发送SIGTERM信号终止业务进程
a.businessProcess.Signal(syscall.SIGTERM)
}
MQTT订阅与发布
启动时订阅
func (s *MQTTService) Start() {
// 订阅下行Topic
s.client.Subscribe(fmt.Sprintf("wdd/RDMC/command/down/%s", s.projectID), s.onCommand)
s.client.Subscribe(fmt.Sprintf("wdd/RDMC/message/down/%s", s.projectID), s.onMessage)
}
发布上行消息
func (s *MQTTService) PublishResult(result *ExecResult) error {
msg := &common.DataMessage{
BaseMessage: common.BaseMessage{
MessageID: uuid.New().String(),
Type: "message",
ProjectID: s.projectID,
Timestamp: time.Now().UnixMilli(),
},
DataType: "exec_result",
Payload: result,
}
return s.client.Publish("wdd/RDMC/message/up", msg)
}
配置项
# configs/config.yaml
server:
host: 0.0.0.0
port: 8990
mqtt:
broker: tcp://mqtt-broker:1883
client_id: watchdog-${PROJECT_ID}
username: ${MQTT_USERNAME}
password: ${MQTT_PASSWORD}
project:
id: ${PROJECT_ID}
namespace: ${NAMESPACE}
auth:
tier_one_secret: ${TIER_ONE_SECRET}
tier_two_secret: ${TIER_TWO_SECRET}
time_offset_allowed: 300
totp_verification_enabled: true
k8s:
kubeconfig_path: "" # 空则使用InCluster配置
常见开发任务
1. 添加新的K8S操作
- 在
pkg/k8s/client.go添加K8S API调用方法 - 在
internal/service/k8s_service.go的switch中添加case - 更新
K8sExecCommand结构(如需新参数)
2. 添加新的指令类型
- 在
message_router.go添加路由分支 - 创建对应的Handler和Service
- 同步更新Exchange-Hub端的指令下发
3. 修改心跳逻辑
- 修改
auth_service.go的VerifyHeartbeat方法 - 同步修改Agent端的心跳发送逻辑
- 如需新增字段,更新DTO结构
相关文档
| 文档 | 内容 |
|---|---|
1-rmdc-watchdog-DDS.md |
详细设计 |
2-rmdc-watchdog-业务流程图.md |
业务流程图 |
3-rmdc-watchdog-内部交互流程.md |
内部交互流程 |