新增rpc调用

This commit is contained in:
zeaslity
2025-03-18 14:19:43 +08:00
parent da0ec7e81a
commit 6abb488622

View File

@@ -1 +1,197 @@
package rpc
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"cmii-uav-watchdog-common/models"
)
// 错误类型常量定义
var (
ErrRequestFailed = errors.New("请求失败")
ErrResponseParsing = errors.New("响应解析失败")
ErrInvalidStatusCode = errors.New("无效的状态码")
ErrTimeout = errors.New("请求超时")
ErrCancelled = errors.New("请求被取消")
)
const (
DefaultHeartbeatURL = "http://cmii-uav-watchdog/heartbeat"
)
// ClientOptions HTTP客户端配置选项
type ClientOptions struct {
Timeout time.Duration // 请求超时时间
RetryCount int // 重试次数
RetryWaitTime time.Duration // 重试等待时间
MaxIdleConns int // 最大空闲连接数
IdleConnTimeout time.Duration // 空闲连接超时时间
}
// DefaultClientOptions 返回默认的客户端配置
func DefaultClientOptions() *ClientOptions {
return &ClientOptions{
Timeout: 10 * time.Second,
RetryCount: 3,
RetryWaitTime: 1 * time.Second,
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
}
}
// Client HTTP客户端封装
type Client struct {
httpClient *http.Client
options *ClientOptions
}
// NewClient 创建一个新的HTTP客户端
// 参数:
// - options: 客户端配置选项如果为nil则使用默认配置
//
// 返回:
// - *Client: HTTP客户端实例
func NewClient(options *ClientOptions) *Client {
if options == nil {
options = DefaultClientOptions()
}
// 创建自定义的Transport
transport := &http.Transport{
MaxIdleConns: options.MaxIdleConns,
IdleConnTimeout: options.IdleConnTimeout,
}
// 创建HTTP客户端
httpClient := &http.Client{
Timeout: options.Timeout,
Transport: transport,
}
return &Client{
httpClient: httpClient,
options: options,
}
}
// SendHeartbeat 发送心跳请求并处理响应
// 参数:
// - ctx: 上下文,用于取消请求
// - url: 心跳请求的URL地址
// - request: 心跳请求数据
//
// 返回:
// - *models.HeartbeatResponse: 心跳响应
// - error: 错误信息
func (c *Client) SendHeartbeat(ctx context.Context, request *models.HeartbeatRequest) (*models.HeartbeatResponse, error) {
// 将请求结构体序列化为JSON
requestBody, err := json.Marshal(request)
if err != nil {
return nil, fmt.Errorf("序列化请求失败: %w", err)
}
// 重试逻辑
var resp *http.Response
var responseBody []byte
var lastError error
for attempt := 0; attempt <= c.options.RetryCount; attempt++ {
// 如果不是第一次尝试,则等待一段时间
if attempt > 0 {
select {
case <-ctx.Done():
return nil, fmt.Errorf("%w: %v", ErrCancelled, ctx.Err())
case <-time.After(c.options.RetryWaitTime):
// 继续下一次尝试
}
}
// 创建HTTP请求
req, err := http.NewRequestWithContext(ctx, http.MethodPost, DefaultHeartbeatURL, bytes.NewBuffer(requestBody))
if err != nil {
lastError = fmt.Errorf("创建请求失败: %w", err)
continue
}
// 设置请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "cmii-uav-watchdog-agent")
// 发送请求
resp, err = c.httpClient.Do(req)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
lastError = fmt.Errorf("%w: %v", ErrTimeout, err)
} else if ctx.Err() == context.Canceled {
lastError = fmt.Errorf("%w: %v", ErrCancelled, err)
} else {
lastError = fmt.Errorf("%w: %v", ErrRequestFailed, err)
}
continue
}
// 确保响应体被关闭
defer resp.Body.Close()
// 检查HTTP状态码
if resp.StatusCode != http.StatusOK {
lastError = fmt.Errorf("%w: 状态码 %d", ErrInvalidStatusCode, resp.StatusCode)
// 读取并丢弃响应体,避免连接泄漏
_, _ = io.Copy(io.Discard, resp.Body)
continue
}
// 读取响应体
responseBody, err = io.ReadAll(resp.Body)
if err != nil {
lastError = fmt.Errorf("读取响应体失败: %w", err)
continue
}
// 成功获取响应,跳出重试循环
lastError = nil
break
}
// 如果最后依然有错误,返回错误
if lastError != nil {
return nil, lastError
}
// 如果没有响应体,返回错误
if responseBody == nil {
return nil, fmt.Errorf("%w: 没有响应体", ErrResponseParsing)
}
// 解析响应JSON
var heartbeatResponse models.HeartbeatResponse
if err := json.Unmarshal(responseBody, &heartbeatResponse); err != nil {
return nil, fmt.Errorf("%w: %v", ErrResponseParsing, err)
}
return &heartbeatResponse, nil
}
// SendHeartbeatWithRetry 发送心跳请求并自动处理超时和重试
// 参数:
// - url: 心跳请求的URL地址
// - request: 心跳请求数据
// - timeout: 整体操作超时时间
//
// 返回:
// - *models.HeartbeatResponse: 心跳响应
// - error: 错误信息
func (c *Client) SendHeartbeatWithRetry(request *models.HeartbeatRequest, timeout time.Duration) (*models.HeartbeatResponse, error) {
// 创建带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.SendHeartbeat(ctx, request)
}