diff --git a/cmii-uav-watchdog-agent/rpc/rpc.go b/cmii-uav-watchdog-agent/rpc/rpc.go index 9ab1e3e..9eac94e 100644 --- a/cmii-uav-watchdog-agent/rpc/rpc.go +++ b/cmii-uav-watchdog-agent/rpc/rpc.go @@ -1 +1,197 @@ package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "cmii-uav-watchdog-common/models" +) + +// 错误类型常量定义 +var ( + ErrRequestFailed = errors.New("请求失败") + ErrResponseParsing = errors.New("响应解析失败") + ErrInvalidStatusCode = errors.New("无效的状态码") + ErrTimeout = errors.New("请求超时") + ErrCancelled = errors.New("请求被取消") +) + +const ( + DefaultHeartbeatURL = "http://cmii-uav-watchdog/heartbeat" +) + +// ClientOptions HTTP客户端配置选项 +type ClientOptions struct { + Timeout time.Duration // 请求超时时间 + RetryCount int // 重试次数 + RetryWaitTime time.Duration // 重试等待时间 + MaxIdleConns int // 最大空闲连接数 + IdleConnTimeout time.Duration // 空闲连接超时时间 +} + +// DefaultClientOptions 返回默认的客户端配置 +func DefaultClientOptions() *ClientOptions { + return &ClientOptions{ + Timeout: 10 * time.Second, + RetryCount: 3, + RetryWaitTime: 1 * time.Second, + MaxIdleConns: 10, + IdleConnTimeout: 90 * time.Second, + } +} + +// Client HTTP客户端封装 +type Client struct { + httpClient *http.Client + options *ClientOptions +} + +// NewClient 创建一个新的HTTP客户端 +// 参数: +// - options: 客户端配置选项,如果为nil则使用默认配置 +// +// 返回: +// - *Client: HTTP客户端实例 +func NewClient(options *ClientOptions) *Client { + if options == nil { + options = DefaultClientOptions() + } + + // 创建自定义的Transport + transport := &http.Transport{ + MaxIdleConns: options.MaxIdleConns, + IdleConnTimeout: options.IdleConnTimeout, + } + + // 创建HTTP客户端 + httpClient := &http.Client{ + Timeout: options.Timeout, + Transport: transport, + } + + return &Client{ + httpClient: httpClient, + options: options, + } +} + +// SendHeartbeat 发送心跳请求并处理响应 +// 参数: +// - ctx: 上下文,用于取消请求 +// - url: 心跳请求的URL地址 +// - request: 心跳请求数据 +// +// 返回: +// - *models.HeartbeatResponse: 心跳响应 +// - error: 错误信息 +func (c *Client) SendHeartbeat(ctx context.Context, request *models.HeartbeatRequest) (*models.HeartbeatResponse, error) { + // 将请求结构体序列化为JSON + requestBody, err := json.Marshal(request) + if err != nil { + return nil, fmt.Errorf("序列化请求失败: %w", err) + } + + // 重试逻辑 + var resp *http.Response + var responseBody []byte + var lastError error + + for attempt := 0; attempt <= c.options.RetryCount; attempt++ { + // 如果不是第一次尝试,则等待一段时间 + if attempt > 0 { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("%w: %v", ErrCancelled, ctx.Err()) + case <-time.After(c.options.RetryWaitTime): + // 继续下一次尝试 + } + } + + // 创建HTTP请求 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, DefaultHeartbeatURL, bytes.NewBuffer(requestBody)) + if err != nil { + lastError = fmt.Errorf("创建请求失败: %w", err) + continue + } + + // 设置请求头 + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "cmii-uav-watchdog-agent") + + // 发送请求 + resp, err = c.httpClient.Do(req) + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + lastError = fmt.Errorf("%w: %v", ErrTimeout, err) + } else if ctx.Err() == context.Canceled { + lastError = fmt.Errorf("%w: %v", ErrCancelled, err) + } else { + lastError = fmt.Errorf("%w: %v", ErrRequestFailed, err) + } + continue + } + + // 确保响应体被关闭 + defer resp.Body.Close() + + // 检查HTTP状态码 + if resp.StatusCode != http.StatusOK { + lastError = fmt.Errorf("%w: 状态码 %d", ErrInvalidStatusCode, resp.StatusCode) + // 读取并丢弃响应体,避免连接泄漏 + _, _ = io.Copy(io.Discard, resp.Body) + continue + } + + // 读取响应体 + responseBody, err = io.ReadAll(resp.Body) + if err != nil { + lastError = fmt.Errorf("读取响应体失败: %w", err) + continue + } + + // 成功获取响应,跳出重试循环 + lastError = nil + break + } + + // 如果最后依然有错误,返回错误 + if lastError != nil { + return nil, lastError + } + + // 如果没有响应体,返回错误 + if responseBody == nil { + return nil, fmt.Errorf("%w: 没有响应体", ErrResponseParsing) + } + + // 解析响应JSON + var heartbeatResponse models.HeartbeatResponse + if err := json.Unmarshal(responseBody, &heartbeatResponse); err != nil { + return nil, fmt.Errorf("%w: %v", ErrResponseParsing, err) + } + + return &heartbeatResponse, nil +} + +// SendHeartbeatWithRetry 发送心跳请求并自动处理超时和重试 +// 参数: +// - url: 心跳请求的URL地址 +// - request: 心跳请求数据 +// - timeout: 整体操作超时时间 +// +// 返回: +// - *models.HeartbeatResponse: 心跳响应 +// - error: 错误信息 +func (c *Client) SendHeartbeatWithRetry(request *models.HeartbeatRequest, timeout time.Duration) (*models.HeartbeatResponse, error) { + // 创建带超时的上下文 + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + return c.SendHeartbeat(ctx, request) +}