package rpc import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "cmii-uav-watchdog-common/models" ) // 错误类型常量定义 var ( ErrRequestFailed = errors.New("请求失败") ErrResponseParsing = errors.New("响应解析失败") ErrInvalidStatusCode = errors.New("无效的状态码") ErrTimeout = errors.New("请求超时") ErrCancelled = errors.New("请求被取消") ) const ( DefaultHeartbeatURL = "http://cmii-uav-watchdog:8080/api/heartbeat" // DefaultHeartbeatURL = "http://192.168.35.70:8990/api/heartbeat" ) // ClientOptions HTTP客户端配置选项 type ClientOptions struct { Timeout time.Duration // 请求超时时间 RetryCount int // 重试次数 RetryWaitTime time.Duration // 重试等待时间 MaxIdleConns int // 最大空闲连接数 IdleConnTimeout time.Duration // 空闲连接超时时间 } // DefaultClientOptions 返回默认的客户端配置 func DefaultClientOptions() *ClientOptions { return &ClientOptions{ Timeout: 10 * time.Second, RetryCount: 3, RetryWaitTime: 1 * time.Second, MaxIdleConns: 10, IdleConnTimeout: 90 * time.Second, } } // Client HTTP客户端封装 type Client struct { httpClient *http.Client options *ClientOptions heartbeatURL string // 心跳请求的URL地址 } // NewClient 创建一个新的HTTP客户端 // 参数: // - options: 客户端配置选项,如果为nil则使用默认配置 // // 返回: // - *Client: HTTP客户端实例 func NewClient(options *ClientOptions, heartbeatURL string) *Client { if options == nil { options = DefaultClientOptions() } // 创建自定义的Transport transport := &http.Transport{ MaxIdleConns: options.MaxIdleConns, IdleConnTimeout: options.IdleConnTimeout, } // 创建HTTP客户端 httpClient := &http.Client{ Timeout: options.Timeout, Transport: transport, } // 解析heartbeatURL heartbeatURL = strings.TrimSpace(heartbeatURL) if heartbeatURL == "" { heartbeatURL = DefaultHeartbeatURL } return &Client{ httpClient: httpClient, options: options, heartbeatURL: heartbeatURL, } } // GetHeartbeatURL 获取心跳请求的URL地址 func (c *Client) GetHeartbeatURL() string { return c.heartbeatURL } // SendHeartbeat 发送心跳请求并处理响应 // 参数: // - ctx: 上下文,用于取消请求 // - url: 心跳请求的URL地址 // - request: 心跳请求数据 // // 返回: // - *models.HeartbeatResponse: 心跳响应 // - error: 错误信息 func (c *Client) SendHeartbeat(ctx context.Context, request *models.HeartbeatRequest) (*models.HeartbeatResponse, error) { // 将请求结构体序列化为JSON requestBody, err := json.Marshal(request) if err != nil { return nil, fmt.Errorf("序列化请求失败: %w", err) } // 重试逻辑 var resp *http.Response var responseBody []byte var lastError error for attempt := 0; attempt <= c.options.RetryCount; attempt++ { // 如果不是第一次尝试,则等待一段时间 if attempt > 0 { select { case <-ctx.Done(): return nil, fmt.Errorf("%w: %v", ErrCancelled, ctx.Err()) case <-time.After(c.options.RetryWaitTime): // 继续下一次尝试 } } // 创建HTTP请求 req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.heartbeatURL, bytes.NewBuffer(requestBody)) if err != nil { lastError = fmt.Errorf("创建请求失败: %w", err) continue } // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "cmii-uav-watchdog-agent") // 发送请求 resp, err = c.httpClient.Do(req) if err != nil { if ctx.Err() == context.DeadlineExceeded { lastError = fmt.Errorf("%w: %v", ErrTimeout, err) } else if ctx.Err() == context.Canceled { lastError = fmt.Errorf("%w: %v", ErrCancelled, err) } else { lastError = fmt.Errorf("%w: %v", ErrRequestFailed, err) } continue } // 确保响应体被关闭 defer resp.Body.Close() // 使用switch检查HTTP状态码 switch resp.StatusCode { case http.StatusOK: // 状态码为200,继续处理响应 case http.StatusBadRequest: lastError = fmt.Errorf("%w: 状态码 %d,请求无效", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue case http.StatusUnauthorized: lastError = fmt.Errorf("%w: 状态码 %d,未授权", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue case http.StatusForbidden: lastError = fmt.Errorf("%w: 状态码 %d,禁止访问", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue case http.StatusNotFound: lastError = fmt.Errorf("%w: 状态码 %d,未找到资源", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue case http.StatusInternalServerError: lastError = fmt.Errorf("%w: 状态码 %d,服务器内部错误", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue default: lastError = fmt.Errorf("%w: 状态码 %d,未知错误", ErrInvalidStatusCode, resp.StatusCode) // 读取并丢弃响应体,避免连接泄漏 _, _ = io.Copy(io.Discard, resp.Body) continue } // 读取响应体 responseBody, err = io.ReadAll(resp.Body) if err != nil { lastError = fmt.Errorf("读取响应体失败: %w", err) continue } // 成功获取响应,跳出重试循环 lastError = nil break } // 如果最后依然有错误,返回错误 if lastError != nil { return nil, lastError } // 如果没有响应体,返回错误 if responseBody == nil { return nil, fmt.Errorf("%w: 没有响应体", ErrResponseParsing) } // 解析响应JSON var heartbeatResponse models.HeartbeatResponse if err := json.Unmarshal(responseBody, &heartbeatResponse); err != nil { return nil, fmt.Errorf("%w: %v", ErrResponseParsing, err) } return &heartbeatResponse, nil } // SendHeartbeatWithRetry 发送心跳请求并自动处理超时和重试 // 参数: // - url: 心跳请求的URL地址 // - request: 心跳请求数据 // - timeout: 整体操作超时时间 // // 返回: // - *models.HeartbeatResponse: 心跳响应 // - error: 错误信息 func (c *Client) SendHeartbeatWithRetry(request *models.HeartbeatRequest, timeout time.Duration) (*models.HeartbeatResponse, error) { // 创建带超时的上下文 ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return c.SendHeartbeat(ctx, request) }