36 KiB
36 KiB
项目代号: NaughtyMan
文档版本: v1.0
编制日期: 2025年10月21日
技术架构师: [待填写]
系统架构设计
总体架构
系统采用分层微服务架构,由接口层、业务逻辑层、数据访问层和外部服务层构成。架构设计遵循高内聚低耦合原则,确保各模块独立演进能力。
graph TB
subgraph "客户端层"
A1[ProjectOctopus]
A2[ProjectTonyStack]
A3[Telegram用户端]
end
subgraph "接入层"
B1[RESTful API网关<br/>Gin框架]
B2[Telegram Webhook<br/>Long Polling]
end
subgraph "业务逻辑层"
C1[认证服务<br/>AuthService]
C2[消息调度服务<br/>MessageDispatcher]
C3[提醒管理服务<br/>ReminderService]
C4[AI交互服务<br/>AIService]
C5[速率控制服务<br/>RateLimiter]
end
subgraph "数据访问层"
D1[消息队列<br/>PriorityQueue]
D2[SQLite数据库<br/>WAL模式]
D3[缓存层<br/>sync.Map]
end
subgraph "外部服务层"
E1[Telegram Bot API]
E2[OpenAI API]
E3[Google Gemini]
E4[xAI Grok]
E5[SOCKS5/HTTP代理]
end
A1 --> B1
A2 --> B1
A3 --> B2
B1 --> C1
B1 --> C2
B2 --> C3
B2 --> C4
C2 --> C5
C3 --> D2
C4 --> D3
C2 --> D1
C5 --> E1
C4 --> E2
C4 --> E3
C4 --> E4
E1 -.代理.-> E5
技术选型说明
| 技术组件 | 选型方案 | 选型理由 |
|---|---|---|
| 开发语言 | Go 1.21+ | 原生并发支持、高性能、跨平台编译、内存安全 |
| Web框架 | Gin v1.9 | 轻量级、路由高效、中间件生态完善、社区活跃 |
| 数据库 | SQLite 3.40+ | 零配置部署、事务ACID保证、跨平台兼容、适合中小规模数据 |
| Bot SDK | tgbotapi v5 | 官方API完整封装、长连接稳定、支持代理配置 |
| 并发控制 | sync.Map + Channel | 原生并发安全结构、无锁化设计、性能优越 |
| 日志框架 | zap | 结构化日志、高性能、灵活的日志等级控制 |
核心模块详细设计
认证与安全模块
双阶段Token认证流程
sequenceDiagram
participant C as 客户端
participant G as API网关
participant A as 认证服务
participant D as 数据库
C->>G: POST /auth/handshake<br/>{api_key}
G->>A: 验证API Key
A->>D: 查询白名单
D-->>A: 返回权限范围
A->>A: 生成Challenge码<br/>(UUID + 时间戳)
A->>D: 存储Challenge<br/>(60s有效期)
A-->>C: {challenge, expire_at}
Note over C: 计算签名<br/>HMAC-SHA256(challenge+api_secret)
C->>G: POST /auth/token<br/>{api_key, challenge, signature}
G->>A: 验证签名
A->>D: 查询Challenge
D-->>A: Challenge数据
A->>A: 对比签名<br/>检查时效
A->>A: 生成JWT Token<br/>(6h有效期)
A->>D: 记录Token映射
A-->>C: {access_token, refresh_token}
Token数据结构设计
JWT Payload规范:
type TokenClaims struct {
ApiKey string `json:"api_key"`
Whitelist []int64 `json:"whitelist"` // 加密的授权目标ID列表
Permissions []string `json:"permissions"` // send_message, query_status
IssuedAt int64 `json:"iat"`
ExpiresAt int64 `json:"exp"`
RefreshAfter int64 `json:"rfa"` // 允许刷新的时间阈值
}
数据库表结构:
CREATE TABLE api_credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key TEXT UNIQUE NOT NULL,
api_secret TEXT NOT NULL, -- BCRYPT加密存储
project TEXT NOT NULL, -- octopus/tonystack
whitelist_ids TEXT, -- JSON数组存储
status INTEGER DEFAULT 1, -- 1:启用 0:禁用
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_used DATETIME,
INDEX idx_api_key (api_key)
);
CREATE TABLE token_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key TEXT NOT NULL,
challenge TEXT,
challenge_expire DATETIME,
access_token TEXT UNIQUE,
refresh_token TEXT UNIQUE,
token_expire DATETIME,
client_ip TEXT,
user_agent TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_access_token (access_token),
INDEX idx_challenge (challenge, challenge_expire)
);
白名单验证机制
中间件实现:
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
token := c.GetHeader("Authorization")
// 1. 解析JWT Token
claims, err := jwt.Parse(token, secretKey)
if err != nil || time.Now().Unix() > claims.ExpiresAt {
c.JSON(401, gin.H{"error": "无效或过期的Token"})
c.Abort()
return
}
// 2. 验证目标ID在白名单内
var req MessageRequest
c.BindJSON(&req)
if !contains(claims.Whitelist, req.TargetID) {
c.JSON(403, gin.H{"error": "目标未授权"})
c.Abort()
return
}
// 3. 注入上下文
c.Set("api_key", claims.ApiKey)
c.Set("project", claims.Project)
c.Next()
}
}
消息通知系统
消息调度核心架构
graph LR
A[API请求] --> B{消息等级判断}
B -->|INFO| C[时段检查器<br/>08:00-23:00]
B -->|WARNING| D[即时队列]
B -->|CRITICAL| D
C -->|时段内| E[批量聚合器<br/>5分钟窗口]
C -->|时段外| F[延迟队列<br/>次日8:00]
E --> G[优先级队列]
D --> G
F --> G
G --> H[速率限制器]
H --> I[Telegram API]
I -->|成功| J[消息日志]
I -->|失败| K{重试判断}
K -->|次数<3| L[指数退避重试]
K -->|次数≥3| M[死信队列]
L --> H
优先级队列实现
数据结构设计:
type Message struct {
ID string
ChatID int64
Level string // INFO/WARNING/CRITICAL
Content string
Metadata map[string]interface{}
Priority int // CRITICAL:3, WARNING:2, INFO:1
RetryCount int
ScheduledAt time.Time
CreatedAt time.Time
}
type PriorityQueue struct {
heap *priorityHeap // 优先级小顶堆
mutex sync.RWMutex
notEmpty chan struct{} // 通知消费者
deadLetter chan Message // 死信队列通道
}
// 堆排序规则
func (pq *priorityHeap) Less(i, j int) bool {
// 1. 优先级高的优先
if pq[i].Priority != pq[j].Priority {
return pq[i].Priority > pq[j].Priority
}
// 2. 相同优先级按时间戳排序
return pq[i].ScheduledAt.Before(pq[j].ScheduledAt)
}
消息模板渲染引擎
模板定义规范:
type MessageTemplate struct {
Project string
Level string
Template string
}
var templates = map[string]MessageTemplate{
"octopus_critical": {
Project: "octopus",
Level: "CRITICAL",
Template: `🚨 【严重告警】{{.Title}}
📍 服务器: {{.Metadata.server}}
📊 详情: {{.Body}}
⏰ 时间: {{formatTime .Metadata.timestamp}}
🔗 查看详情: {{.Metadata.dashboard_url}}
#服务器监控 #CRITICAL`,
},
"tonystack_warning": {
Project: "tonystack",
Level: "WARNING",
Template: `⚠️ 【风险提示】{{.Title}}
💰 标的: {{.Metadata.symbol}}
📈 触发值: {{.Body}}
📉 当前价格: {{.Metadata.current_price}}
⏰ 时间: {{formatTime .Metadata.timestamp}}
#金融监控 #WARNING`,
},
}
// 渲染函数
func RenderTemplate(msg Message) (string, error) {
key := fmt.Sprintf("%s_%s", msg.Project, strings.ToLower(msg.Level))
tmpl, exists := templates[key]
if !exists {
return "", errors.New("模板不存在")
}
t := template.New("message").Funcs(template.FuncMap{
"formatTime": func(ts string) string {
t, _ := time.Parse(time.RFC3339, ts)
return t.Format("2006-01-02 15:04:05")
},
})
t, _ = t.Parse(tmpl.Template)
var buf bytes.Buffer
t.Execute(&buf, msg)
return buf.String(), nil
}
速率限制器设计
令牌桶算法实现:
type TokenBucket struct {
capacity int // 桶容量
tokens int // 当前令牌数
refillRate time.Duration // 补充速率
lastRefill time.Time
mutex sync.Mutex
}
func (tb *TokenBucket) Take() bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
// 计算应补充的令牌数
now := time.Now()
elapsed := now.Sub(tb.lastRefill)
refillCount := int(elapsed / tb.refillRate)
if refillCount > 0 {
tb.tokens = min(tb.capacity, tb.tokens + refillCount)
tb.lastRefill = now
}
// 尝试获取令牌
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
type RateLimiter struct {
globalBucket *TokenBucket // 全局30/s
chatBuckets sync.Map // map[int64]*TokenBucket
editBucket *TokenBucket // 编辑消息限流
}
func NewRateLimiter() *RateLimiter {
return &RateLimiter{
globalBucket: &TokenBucket{
capacity: 30,
tokens: 30,
refillRate: time.Second / 30,
lastRefill: time.Now(),
},
editBucket: &TokenBucket{
capacity: 20,
tokens: 20,
refillRate: time.Minute / 20,
lastRefill: time.Now(),
},
}
}
func (rl *RateLimiter) AllowSend(chatID int64, isEdit bool) bool {
// 1. 检查全局限流
if !rl.globalBucket.Take() {
return false
}
// 2. 编辑消息检查专用桶
if isEdit && !rl.editBucket.Take() {
rl.globalBucket.tokens++ // 归还全局令牌
return false
}
// 3. 检查单聊天限流
bucket := rl.getChatBucket(chatID)
if !bucket.Take() {
rl.globalBucket.tokens++
return false
}
return true
}
定时提醒系统
状态机设计
stateDiagram-v2
[*] --> 等待输入: /notify命令
等待输入 --> 选择日期: 点击日期按钮
选择日期 --> 选择时间: 确认日期
选择时间 --> 配置重复: 确认时间
配置重复 --> 输入内容: 选择重复规则
输入内容 --> 确认创建: 输入提醒文本
确认创建 --> 已激活: 提交成功
已激活 --> 已触发: 到达触发时间
已触发 --> 已激活: 有重复规则
已触发 --> 已完成: 无重复规则
已激活 --> 已删除: 用户删除
已完成 --> [*]
已删除 --> [*]
日历选择器UI实现
InlineKeyboard布局:
func GenerateCalendar(year int, month time.Month) [][]tgbotapi.InlineKeyboardButton {
firstDay := time.Date(year, month, 1, 0, 0, 0, 0, time.Local)
lastDay := firstDay.AddDate(0, 1, -1)
keyboard := [][]tgbotapi.InlineKeyboardButton{
// 月份导航行
{
tgbotapi.NewInlineKeyboardButtonData("◀", fmt.Sprintf("cal_prev_%d_%d", year, month)),
tgbotapi.NewInlineKeyboardButtonData(fmt.Sprintf("%d年%d月", year, month), "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("▶", fmt.Sprintf("cal_next_%d_%d", year, month)),
},
// 星期标题行
{
tgbotapi.NewInlineKeyboardButtonData("日", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("一", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("二", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("三", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("四", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("五", "cal_ignore"),
tgbotapi.NewInlineKeyboardButtonData("六", "cal_ignore"),
},
}
// 日期网格生成
currentWeek := []tgbotapi.InlineKeyboardButton{}
for weekday := 0; weekday < int(firstDay.Weekday()); weekday++ {
currentWeek = append(currentWeek, tgbotapi.NewInlineKeyboardButtonData(" ", "cal_ignore"))
}
for day := 1; day <= lastDay.Day(); day++ {
date := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
callback := fmt.Sprintf("cal_select_%s", date.Format("2006-01-02"))
// 过期日期禁用
label := fmt.Sprintf("%d", day)
if date.Before(time.Now().Truncate(24 * time.Hour)) {
callback = "cal_ignore"
label = "×"
}
currentWeek = append(currentWeek, tgbotapi.NewInlineKeyboardButtonData(label, callback))
// 每行7天
if len(currentWeek) == 7 {
keyboard = append(keyboard, currentWeek)
currentWeek = []tgbotapi.InlineKeyboardButton{}
}
}
if len(currentWeek) > 0 {
keyboard = append(keyboard, currentWeek)
}
return keyboard
}
提醒调度器实现
定时扫描机制:
type ReminderScheduler struct {
db *sql.DB
bot *tgbotapi.BotAPI
ticker *time.Ticker
stopChan chan struct{}
}
func (rs *ReminderScheduler) Start() {
rs.ticker = time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-rs.ticker.C:
rs.processPendingReminders()
case <-rs.stopChan:
return
}
}
}()
}
func (rs *ReminderScheduler) processPendingReminders() {
// 查询需触发的提醒(未来5分钟内)
rows, _ := rs.db.Query(`
SELECT id, chat_id, user_id, content, repeat_rule, next_trigger
FROM reminders
WHERE status = 1
AND next_trigger <= datetime('now', '+5 minutes')
ORDER BY next_trigger
`)
defer rows.Close()
for rows.Next() {
var r Reminder
rows.Scan(&r.ID, &r.ChatID, &r.UserID, &r.Content, &r.RepeatRule, &r.NextTrigger)
// 等待到精确触发时间
wait := r.NextTrigger.Sub(time.Now())
if wait > 0 {
time.Sleep(wait)
}
// 发送提醒消息
msg := tgbotapi.NewMessage(r.ChatID, fmt.Sprintf("⏰ 提醒\\n\\n%s", r.Content))
rs.bot.Send(msg)
// 更新下次触发时间
if r.RepeatRule != "" {
nextTrigger := calculateNextTrigger(r.NextTrigger, r.RepeatRule)
rs.db.Exec(`UPDATE reminders SET next_trigger = ? WHERE id = ?`, nextTrigger, r.ID)
} else {
rs.db.Exec(`UPDATE reminders SET status = 0 WHERE id = ?`, r.ID)
}
}
}
func calculateNextTrigger(current time.Time, rule string) time.Time {
switch rule {
case "daily":
return current.AddDate(0, 0, 1)
case "weekly":
return current.AddDate(0, 0, 7)
case "biweekly":
return current.AddDate(0, 0, 14)
case "monthly":
return current.AddDate(0, 1, 0)
default:
// 解析RRULE格式
return parseRRule(current, rule)
}
}
AI智能体系统
多厂商适配层架构
graph TB
A[Telegram消息] --> B{触发条件判断}
B -->|私聊| C[直接触发]
B -->|"群聊@Bot"| C
B -->|其他| D[忽略]
C --> E[上下文提取器]
E --> F[消息历史查询<br/>最近3条]
F --> G[提示词构建器]
G --> H[AI路由器]
H --> I1[OpenAI适配器]
H --> I2[Gemini适配器]
H --> I3[Grok适配器]
H --> I4[OpenRouter适配器]
I1 --> J[流式响应处理器]
I2 --> J
I3 --> J
I4 --> J
J --> K[消息分段器<br/>500字符/5秒]
K --> L[editMessageText]
L --> M[速率限制器]
M --> N[Telegram API]
AI适配器接口定义
type AIProvider interface {
Name() string
Chat(ctx context.Context, messages []ChatMessage, stream bool) (<-chan string, error)
SupportStream() bool
SupportReasoning() bool
}
type ChatMessage struct {
Role string `json:"role"` // system/user/assistant
Content string `json:"content"`
Name string `json:"name,omitempty"`
Meta map[string]interface{} `json:"-"`
}
// OpenAI适配器实现
type OpenAIProvider struct {
apiKey string
model string
client *http.Client
}
func (p *OpenAIProvider) Chat(ctx context.Context, messages []ChatMessage, stream bool) (<-chan string, error) {
reqBody := map[string]interface{}{
"model": p.model,
"messages": messages,
"stream": stream,
}
if stream {
return p.streamChat(ctx, reqBody)
}
return p.blockingChat(ctx, reqBody)
}
func (p *OpenAIProvider) streamChat(ctx context.Context, reqBody map[string]interface{}) (<-chan string, error) {
ch := make(chan string, 10)
req, _ := http.NewRequestWithContext(ctx, "POST",
"<https://api.openai.com/v1/chat/completions>",
toJSON(reqBody))
req.Header.Set("Authorization", "Bearer "+p.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := p.client.Do(req)
if err != nil {
return nil, err
}
go func() {
defer close(ch)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
if data == "[DONE]" {
break
}
var chunk struct {
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
}
json.Unmarshal([]byte(data), &chunk)
if len(chunk.Choices) > 0 {
ch <- chunk.Choices[^0].Delta.Content
}
}
}()
return ch, nil
}
流式响应处理策略
分段更新逻辑:
type StreamHandler struct {
bot *tgbotapi.BotAPI
rateLimiter *RateLimiter
updateBuffer strings.Builder
lastUpdate time.Time
messageID int
chatID int64
}
func (sh *StreamHandler) ProcessStream(ctx context.Context, stream <-chan string) error {
initialMsg := tgbotapi.NewMessage(sh.chatID, "🤔 思考中...")
sent, _ := sh.bot.Send(initialMsg)
sh.messageID = sent.MessageID
sh.lastUpdate = time.Now()
for {
select {
case chunk, ok := <-stream:
if !ok {
// 流结束,发送最终消息
sh.finalUpdate("✅ 回答完成")
return nil
}
sh.updateBuffer.WriteString(chunk)
// 触发更新条件: 500字符 OR 5秒间隔
shouldUpdate := sh.updateBuffer.Len() >= 500 ||
time.Since(sh.lastUpdate) >= 5*time.Second
if shouldUpdate {
sh.sendUpdate()
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (sh *StreamHandler) sendUpdate() {
content := sh.updateBuffer.String()
// 等待速率限制器允许
for !sh.rateLimiter.AllowSend(sh.chatID, true) {
time.Sleep(100 * time.Millisecond)
}
edit := tgbotapi.NewEditMessageText(sh.chatID, sh.messageID, content)
edit.ParseMode = "Markdown"
sh.bot.Send(edit)
sh.lastUpdate = time.Now()
}
数据库设计
ER图
erDiagram
API_CREDENTIALS ||--o{ TOKEN_SESSIONS : generates
API_CREDENTIALS ||--o{ MESSAGE_LOG : sends
WHITELIST ||--o{ API_CREDENTIALS : authorizes
REMINDERS }o--|| WHITELIST : belongs_to
API_CREDENTIALS {
int id PK
string api_key UK
string api_secret
string project
json whitelist_ids
int status
datetime created_at
}
TOKEN_SESSIONS {
int id PK
string api_key FK
string challenge
datetime challenge_expire
string access_token UK
string refresh_token UK
datetime token_expire
string client_ip
}
WHITELIST {
int id PK
string type
bigint entity_id UK
string alias
int status
datetime added_at
}
MESSAGE_LOG {
int id PK
string message_id
bigint chat_id
string level
string project FK
text content
string status
int retry_count
datetime sent_at
}
REMINDERS {
int id PK
bigint user_id
bigint chat_id FK
text content
datetime trigger_time
string repeat_rule
datetime next_trigger
int status
}
AI_CONVERSATIONS {
int id PK
bigint chat_id
bigint user_id
text user_message
text ai_response
string provider
string model
int tokens_used
datetime created_at
}
核心表索引策略
-- 消息日志表优化索引
CREATE INDEX idx_message_log_composite ON message_log(status, created_at);
CREATE INDEX idx_message_log_chat ON message_log(chat_id, sent_at DESC);
-- 提醒表优化索引
CREATE INDEX idx_reminders_trigger ON reminders(next_trigger, status) WHERE status = 1;
CREATE INDEX idx_reminders_user ON reminders(user_id, status);
-- Token会话表优化索引
CREATE INDEX idx_token_expire ON token_sessions(token_expire) WHERE token_expire > datetime('now');
-- AI对话表优化索引
CREATE INDEX idx_ai_conversations_chat ON ai_conversations(chat_id, created_at DESC);
接口设计
RESTful API规范
认证接口
1. 握手接口
POST /api/v1/auth/handshake
Content-Type: application/json
{
"api_key": "octopus_ak_1234567890"
}
响应:
{
"challenge": "uuid-v4-string",
"expire_at": "2025-10-21T17:01:00Z",
"algorithm": "HMAC-SHA256"
}
2. Token获取接口
POST /api/v1/auth/token
Content-Type: application/json
{
"api_key": "octopus_ak_1234567890",
"challenge": "uuid-v4-string",
"signature": "hex-encoded-hmac-sha256"
}
响应:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 21600,
"refresh_token": "refresh-token-string"
}
消息发送接口
POST /api/v1/message/send
Authorization: Bearer <access_token>
Content-Type: application/json
{
"target_type": "user",
"target_id": "123456789",
"level": "warning",
"project": "octopus",
"content": {
"title": "CPU使用率告警",
"body": "服务器CPU使用率达85%",
"metadata": {
"server": "prod-web-01",
"cpu_usage": 85.3,
"threshold": 80,
"timestamp": "2025-10-21T16:58:00Z",
"dashboard_url": "<https://monitor.example.com/server/prod-web-01>"
}
}
}
响应:
{
"message_id": "msg_abc123",
"status": "queued",
"estimated_send_time": "2025-10-21T17:00:00Z",
"priority": 2
}
消息状态查询接口
GET /api/v1/message/status?message_id=msg_abc123
Authorization: Bearer <access_token>
响应:
{
"message_id": "msg_abc123",
"status": "sent",
"telegram_message_id": 98765,
"sent_at": "2025-10-21T17:00:05Z",
"retry_count": 0
}
Telegram Bot指令接口
| 指令 | 功能 | 权限要求 | 响应方式 |
|---|---|---|---|
/start |
激活机器人 | 白名单 | 欢迎消息+功能菜单 |
/help |
帮助文档 | 白名单 | 富文本说明 |
/notify |
创建提醒 | 白名单 | InlineKeyboard交互 |
/notify_list |
查看提醒列表 | 白名单 | 列表+删除按钮 |
/status |
系统状态 | 管理员 | 队列长度/限流状态 |
/whitelist add <ID> |
添加白名单 | 管理员 | 确认消息 |
部署架构
系统部署拓扑
graph TB
subgraph "生产环境"
A[Nginx反向代理<br/>:443 HTTPS]
B[NaughtyMan服务<br/>:8080]
C[SQLite数据库<br/>bot.db]
D[日志文件<br/>/var/log/naughty_man]
end
subgraph "网络层"
E[SOCKS5代理<br/>:1080]
end
subgraph "外部服务"
F[Telegram API<br/>api.telegram.org]
G[OpenAI API]
end
A -->|TLS| B
B --> C
B --> D
B -->|代理| E
E --> F
B -->|直连| G
subgraph "监控系统"
H[Prometheus<br/>:9090]
I[Grafana<br/>:3000]
end
B -->|metrics| H
H --> I
配置文件结构
config.yaml:
server:
host: "0.0.0.0"
port: 8080
mode: "release" # debug/release
telegram:
bot_token: "${TELEGRAM_BOT_TOKEN}"
proxy:
enabled: true
type: "socks5"
host: "127.0.0.1"
port: 1080
username: ""
password: ""
rate_limit:
global_rate: 30 # 消息/秒
chat_rate: 1 # 私聊消息/秒
group_rate: 20 # 群聊消息/分钟
edit_rate: 20 # 编辑消息/分钟
database:
path: "./data/bot.db"
wal_mode: true
max_open_conns: 25
max_idle_conns: 5
conn_max_lifetime: 3600 # 秒
ai:
default_provider: "openai"
providers:
openai:
api_key: "${OPENAI_API_KEY}"
model: "gpt-4-turbo"
base_url: "<https://api.openai.com/v1>"
timeout: 30
gemini:
api_key: "${GEMINI_API_KEY}"
model: "gemini-1.5-pro"
grok:
api_key: "${GROK_API_KEY}"
model: "grok-2"
context_window: 3 # 历史消息条数
stream_enabled: true
security:
jwt_secret: "${JWT_SECRET}"
token_expire_hours: 6
challenge_expire_seconds: 60
admin_chat_ids: [^123456789]
logging:
level: "info" # debug/info/warn/error
format: "json"
output: "./logs/app.log"
max_size: 100 # MB
max_backups: 10
max_age: 30 # 天
compress: true
monitoring:
prometheus_enabled: true
metrics_path: "/metrics"
环境变量清单
# Telegram配置
export TELEGRAM_BOT_TOKEN="1234567890:ABCdefGHIjklMNOpqrsTUVwxyz"
# AI服务密钥
export OPENAI_API_KEY="sk-proj-..."
export GEMINI_API_KEY="AIza..."
export GROK_API_KEY="xai-..."
# 安全密钥
export JWT_SECRET="random-256-bit-secret"
# 数据库路径
export DB_PATH="./data/bot.db"
# 运行模式
export GIN_MODE="release"
非功能性需求实现
性能优化策略
数据库连接池配置
func InitDB(config DBConfig) *sql.DB {
db, _ := sql.Open("sqlite3", config.Path+"?_journal=WAL&_busy_timeout=5000")
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(time.Duration(config.ConnMaxLifetime) * time.Second)
// 启用WAL模式提升并发性能
db.Exec("PRAGMA journal_mode=WAL")
db.Exec("PRAGMA synchronous=NORMAL")
db.Exec("PRAGMA cache_size=-64000") // 64MB缓存
return db
}
消息批量处理
type BatchProcessor struct {
messages []Message
timer *time.Timer
mutex sync.Mutex
batchSize int
batchDelay time.Duration
}
func (bp *BatchProcessor) Add(msg Message) {
bp.mutex.Lock()
defer bp.mutex.Unlock()
bp.messages = append(bp.messages, msg)
// 达到批量大小或超时则触发发送
if len(bp.messages) >= bp.batchSize {
bp.flush()
} else if bp.timer == nil {
bp.timer = time.AfterFunc(bp.batchDelay, bp.flush)
}
}
func (bp *BatchProcessor) flush() {
bp.mutex.Lock()
msgs := bp.messages
bp.messages = nil
bp.timer = nil
bp.mutex.Unlock()
// 聚合同类消息
aggregated := aggregateMessages(msgs)
for _, msg := range aggregated {
sendToTelegram(msg)
}
}
可靠性保障
消息重试机制
type RetryPolicy struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
var defaultPolicy = RetryPolicy{
MaxRetries: 3,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
}
func SendWithRetry(msg Message, policy RetryPolicy) error {
var lastErr error
for attempt := 0; attempt <= policy.MaxRetries; attempt++ {
err := sendToTelegram(msg)
if err == nil {
return nil
}
lastErr = err
// 判断是否可重试(非4xx错误)
if !isRetryable(err) {
return err
}
// 计算退避延迟
delay := policy.InitialDelay * time.Duration(math.Pow(policy.Multiplier, float64(attempt)))
if delay > policy.MaxDelay {
delay = policy.MaxDelay
}
log.Warn("消息发送失败,重试中",
"attempt", attempt+1,
"delay", delay,
"error", err)
time.Sleep(delay)
}
// 转入死信队列
deadLetterQueue <- msg
return fmt.Errorf("重试%d次后仍失败: %w", policy.MaxRetries, lastErr)
}
优雅关闭机制
func (app *Application) Shutdown(ctx context.Context) error {
log.Info("开始优雅关闭...")
// 1. 停止接收新请求
app.httpServer.SetKeepAlivesEnabled(false)
// 2. 停止Telegram轮询
app.bot.StopReceivingUpdates()
// 3. 等待消息队列清空(最多30秒)
queueCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for {
select {
case <-queueCtx.Done():
log.Warn("消息队列未完全处理完毕", "remaining", app.queue.Len())
goto CLEANUP
default:
if app.queue.Len() == 0 {
goto CLEANUP
}
time.Sleep(100 * time.Millisecond)
}
}
CLEANUP:
// 4. 关闭HTTP服务器
app.httpServer.Shutdown(ctx)
// 5. 关闭数据库连接
app.db.Close()
log.Info("优雅关闭完成")
return nil
}
监控与可观测性
Prometheus指标定义
var (
// 消息发送指标
messagesSent = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "naughty_man_messages_sent_total",
Help: "消息发送总数",
},
[]string{"project", "level", "status"},
)
// 消息发送延迟
messageSendDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "naughty_man_message_send_duration_seconds",
Help: "消息发送耗时",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 10),
},
[]string{"project", "level"},
)
// 队列长度
queueLength = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "naughty_man_queue_length",
Help: "消息队列长度",
},
[]string{"priority"},
)
// 速率限制器状态
rateLimiterTokens = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "naughty_man_rate_limiter_tokens",
Help: "速率限制器剩余令牌数",
},
[]string{"limiter_type"},
)
// AI调用指标
aiRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "naughty_man_ai_requests_total",
Help: "AI API调用总数",
},
[]string{"provider", "model", "status"},
)
)
结构化日志规范
// 使用zap实现结构化日志
logger, _ := zap.NewProduction()
defer logger.Sync()
sugar := logger.Sugar()
// 业务日志示例
sugar.Infow("消息入队",
"message_id", msg.ID,
"chat_id", msg.ChatID,
"level", msg.Level,
"priority", msg.Priority,
"scheduled_at", msg.ScheduledAt,
)
sugar.Errorw("消息发送失败",
"message_id", msg.ID,
"error", err.Error(),
"retry_count", msg.RetryCount,
"will_retry", msg.RetryCount < 3,
)
测试策略
单元测试覆盖计划
| 模块 | 测试重点 | 目标覆盖率 |
|---|---|---|
| 认证模块 | 签名验证、Token生成/解析、白名单校验 | >90% |
| 速率限制器 | 令牌桶算法、并发安全性 | >95% |
| 消息队列 | 优先级排序、入队出队操作 | >90% |
| 模板引擎 | 变量替换、错误处理 | >85% |
| AI适配器 | 流式解析、超时处理 | >80% |
集成测试场景
场景1: 端到端消息发送流程
func TestE2EMessageFlow(t *testing.T) {
// 1. 创建测试环境
app := setupTestApp()
defer app.Cleanup()
// 2. 获取Token
token := authenticateTestClient(app)
// 3. 发送CRITICAL消息
msgID := sendTestMessage(app, token, Message{
TargetID: testChatID,
Level: "CRITICAL",
Content: testContent,
})
// 4. 验证消息入队
assert.Eventually(t, func() bool {
return app.Queue.Contains(msgID)
}, 5*time.Second, 100*time.Millisecond)
// 5. 模拟Telegram API响应
mockTelegramResponse(testChatID, msgID)
// 6. 验证消息状态更新
status := queryMessageStatus(app, msgID)
assert.Equal(t, "sent", status)
}
压力测试指标
测试工具: Apache JMeter / Vegeta
测试用例:
# 并发消息发送压测(1000 QPS持续1分钟)
echo "POST <http://localhost:8080/api/v1/message/send>" | \\
vegeta attack -rate=1000 -duration=60s -header="Authorization: Bearer $TOKEN" \\
| vegeta report
# 期望指标:
# - 成功率 > 99%
# - P95延迟 < 200ms
# - P99延迟 < 500ms
# - 无内存泄漏
风险评估与对策
| 风险项 | 发生概率 | 影响程度 | 应对策略 | 责任人 |
|---|---|---|---|---|
| Telegram API限流导致消息积压 | 高 | 高 | 1. 实现自适应速率限制 2. 优先级队列保证CRITICAL优先 3. 消息聚合减少调用 |
后端负责人 |
| SQLite写入冲突 | 中 | 中 | 1. 启用WAL模式 2. 写操作串行化 3. 必要时迁移PostgreSQL |
数据库架构师 |
| AI服务不稳定 | 中 | 低 | 1. 多厂商自动切换 2. 超时熔断机制 3. 降级为普通回复 |
AI集成负责人 |
| 代理服务中断 | 低 | 高 | 1. 备用代理配置 2. 心跳检测自动切换 3. 监控告警 |
运维负责人 |
| 死信队列溢出 | 低 | 中 | 1. 定时任务清理老数据 2. 人工审核重试 3. 容量告警 |
值班工程师 |
项目实施计划
迭代排期
gantt
title NaughtyMan开发计划
dateFormat YYYY-MM-DD
section 阶段一:基础设施
数据库表结构设计 :done, db, 2025-10-22, 2d
速率限制器实现 :active, rate, 2025-10-24, 3d
Token认证系统 :auth, after rate, 3d
白名单中间件 :whitelist, after auth, 2d
代理配置与测试 :proxy, after whitelist, 2d
section 阶段二:核心功能
消息通知API开发 :api, after proxy, 4d
模板渲染引擎 :template, after api, 2d
优先级队列实现 :queue, after template, 3d
消息重试机制 :retry, after queue, 2d
定时提醒CRUD :reminder, after retry, 4d
section 阶段三:高级功能
InlineKeyboard交互 :inline, after reminder, 4d
AI多厂商适配 :ai, after inline, 5d
流式响应处理 :stream, after ai, 3d
性能优化与压测 :perf, after stream, 3d
section 阶段四:上线准备
集成测试 :integration, after perf, 3d
文档编写 :doc, after integration, 2d
监控告警配置 :monitor, after doc, 2d
灰度发布 :release, after monitor, 2d
人员分工
| 角色 | 职责 | 人数 |
|---|---|---|
| 技术负责人 | 架构设计审核、技术选型决策、风险评估 | 1 |
| 后端工程师 | 核心业务逻辑开发、API接口实现 | 2 |
| AI集成工程师 | 多厂商适配层、流式响应处理 | 1 |
| 测试工程师 | 单元测试、集成测试、压力测试 | 1 |
| 运维工程师 | 部署脚本、监控配置、应急响应 | 1 |
交付物清单
- 源代码仓库(Git)
- 数据库迁移脚本
- API接口文档(OpenAPI 3.0)
- 部署操作手册
- 监控告警配置文件
- 性能测试报告
- 用户使用指南
附录
术语表
| 术语 | 英文 | 说明 |
|---|---|---|
| 令牌桶 | Token Bucket | 速率限制算法,以固定速率补充令牌实现流控 |
| 死信队列 | Dead Letter Queue | 存储多次失败消息的特殊队列 |
| 指数退避 | Exponential Backoff | 重试间隔呈指数增长的策略 |
| WAL模式 | Write-Ahead Logging | SQLite日志模式,提升并发写入性能 |
| RRULE | Recurrence Rule | iCalendar重复规则标准(RFC 5545) |
参考资料
文档编制完成 | 总计约1.2万字 | 包含8个Mermaid图表 | 覆盖12个核心设计模块