Files
tokens-reef/deploy/docs-backup/MODULE_01_API_GATEWAY.md
Developer 349d783fd1 refactor: clean up project structure
- Remove old review reports (keep latest only)
- Move docs/ to deploy/docs-backup/
- Move performance-testing/ to deploy/
- Clean up test output files
- Organize root directory
2026-04-06 23:36:03 +08:00

16 KiB
Raw Permalink Blame History

Sub2API 模块分析报告API Gateway 核心

1. 模块概述

1.1 模块定位

API Gateway 是 Sub2API 的核心模块,负责接收用户请求、选择合适的上游账号、转发请求到 AI 提供商(如 Anthropic、OpenAI、Google Gemini 等),并处理响应。该模块是整个系统流量入口和转发核心。

1.2 核心职责

  • 请求接收与路由:接收来自用户的 AI API 请求
  • 账号选择:根据负载、可用性、优先级选择合适的上游账号
  • 请求转发:将请求转发到上游 AI 服务
  • 故障处理:账号故障时的自动重试和切换
  • 响应返回:将上游响应返回给用户

2. 代码结构分析

2.1 核心文件

文件路径 职责 代码行数
handler/gateway_handler.go Claude /v1/messages 主入口 ~1800 行
handler/openai_chat_completions.go OpenAI /v1/chat/completions 处理器 ~2500 行
handler/gemini_v1beta_handler.go Google Gemini /v1beta/* 处理器 ~600 行
handler/sora_gateway_handler.go Sora 视频生成处理器 ~500 行
service/gateway_service.go 核心网关逻辑,账号选择、重试、8516 行 8516 行
service/openai_gateway_service.go OpenAI 特定处理WebSocket、流式 ~4000 行
service/antigravity_gateway_service.go Antigravity 平台支持 ~1500 行

2.2 目录结构

backend/internal/
├── handler/
│   ├── gateway_handler.go              # Claude /v1/messages 主入口
│   ├── openai_chat_completions.go      # OpenAI /v1/chat/completions
│   ├── openai_gateway_handler.go       # OpenAI 兼容接口(含 SSE、WebSocket
│   ├── gemini_v1beta_handler.go        # Google Gemini /v1beta/*
│   ├── sora_gateway_handler.go         # Sora 视频生成
│   └── endpoint.go                     # 端点辅助函数
└── service/
    ├── gateway_service.go              # 核心网关服务8516行
    ├── openai_gateway_service.go       # OpenAI 特定处理
    ├── openai_ws_v2/                   # OpenAI WebSocket v2 支持
    ├── antigravity_gateway_service.go  # Antigravity 平台
    ├── openai_account_scheduler.go     # OpenAI 账号调度器
    └── gateway_anthropic_apikey_passthrough.go # API Key 透传

3. 功能详细分析

3.1 请求处理流程

用户请求 → 认证中间件 → 网关处理器 → 账号选择 → 请求转发 → 响应处理 → 返回结果

关键步骤:

  1. 请求认证 (api_key_auth.go)

    • 验证 API Key 有效性
    • 获取用户和分组信息
    • 检查配额和限流
  2. 账号选择 (gateway_service.go:SelectAccountWithLoadAwareness)

    • 检查分组下的可用账号
    • 根据负载因子排序
    • 支持粘性会话(同一会话路由到同一账号)
  3. 请求转发 (forwardRequest)

    • 构建上游请求
    • 添加必要的认证头
    • 处理流式/非流式响应
  4. 故障处理 (handleUpstreamError)

    • 单账号重试(同一账号重试)
    • 账号切换(切换到其他账号)
    • 退避策略(线性退避)

3.2 账号选择算法

// gateway_service.go - SelectAccountWithLoadAwareness (实际实现)
func (s *GatewayService) SelectAccountWithLoadAwareness(
    ctx context.Context,
    groupID *int64,
    sessionHash string,
    requestedModel string,
    excludedIDs map[int64]struct{},
    metadataUserID string,  // 已废弃参数
) (*AccountSelectionResult, error) {
    
    // 1. 检查 Claude Code 限制(可能会替换 groupID 为降级分组)
    group, groupID, err := s.checkClaudeCodeRestriction(ctx, groupID)
    if err != nil {
        return nil, err
    }
    ctx = s.withGroupContext(ctx, group)
    
    // 2. 获取粘性会话绑定的账号
    var stickyAccountID int64
    if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil {
        stickyAccountID = accountID
    }
    
    // 3. 尝试获取账号槽位
    for {
        account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, localExcluded)
        if err != nil {
            return nil, err
        }
        
        // 3.1 尝试获取并发槽位
        result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency)
        if err == nil && result.Acquired {
            // 3.2 检查会话限制
            if !s.checkAndRegisterSession(ctx, account, sessionHash) {
                result.ReleaseFunc()
                localExcluded[account.ID] = struct{}{}
                continue
            }
            return &AccountSelectionResult{Account: account, Acquired: true, ...}, nil
        }
        
        // 3.3 支持等待计划WaitPlan
        // ...
    }
}

实际流程6 步):

  1. Claude Code 限制检查 (checkClaudeCodeRestriction)

    • 检测 Claude Code 客户端
    • 可能替换 groupID 为降级分组
  2. 粘性会话获取 (cache.GetSessionAccountID)

    • 从 Redis 获取 sessionHash 绑定的账号
    • 用于保持同一会话路由到同一账号
  3. 并发槽位获取 (tryAcquireAccountSlot)

    • 检查账号并发限制
    • 使用 ConcurrencyService 管理槽位
  4. 会话限制检查 (checkAndRegisterSession)

    • 检查每个账号的会话数上限
    • 支持等待队列
  5. 等待计划 (WaitPlan)

    • 如果账号繁忙,返回等待计划
    • 客户端可以等待重试
  6. 混合调度 (selectAccountWithMixedScheduling)

    • Anthropic/Gemini 分组支持混合调度
    • 包含启用了 mixed_scheduling 的 Antigravity 账号

关键特性:

  • 负载感知:根据并发槽位和负载因子选择
  • 粘性会话:基于 sessionHash 保持会话
  • 会话限制:每个账号有最大会话数限制
  • 等待计划:支持队列等待而非直接拒绝
  • Claude Code 适配:自动降级处理

3.3 故障重试策略

// gateway_service.go - handleUpstreamError
func handleUpstreamError(...) *handleResult {
    // 1. 同账号重试(短延迟)
    if canRetrySameAccount(statusCode) {
        return retryWithShortDelay(account)
    }
    
    // 2. 切换账号(稍长延迟)
    if hasMoreAccounts(groupID) {
        return switchToNextAccount(groupID, failedAccount)
    }
    
    // 3. 线性退避
    return retryWithLinearBackoff(attemptNum)
}

重试参数(可配置):

  • 单账号重试次数:默认 1 次
  • 账号切换次数:默认 3 次
  • 重试延迟100ms ~ 8000ms线性增长

3.4 支持的上游平台

平台 端点 认证方式 特性
Anthropic Claude /v1/messages OAuth/API Key 流式响应,消息历史
OpenAI /v1/chat/completions API Key 多模型支持
Google Gemini /v1beta/models/:generateContent API Key 多模态支持
Antigravity /v1/messages, /v1beta/ OAuth 独立配额系统
AWS Bedrock /model invocation AWS 签名 Claude on Bedrock

4. 关键数据结构

4.1 GatewayService 依赖的服务

// gateway_service.go - GatewayService 结构体
type GatewayService struct {
    // === Repositories ===
    accountRepo          AccountRepository      // 账号数据访问
    groupRepo            GroupRepository         // 分组数据访问
    usageLogRepo         UsageLogRepository      // 用量日志
    usageBillingRepo     UsageBillingRepository  // 用量计费
    userRepo             UserRepository         // 用户数据
    userSubRepo          UserSubscriptionRepository  // 订阅数据
    userGroupRateRepo    UserGroupRateRepository // 用户分组费率
    
    // === Services ===
    billingService       *BillingService        // 计费服务
    rateLimitService     *RateLimitService       // 限流服务
    billingCacheService  *BillingCacheService    // 计费缓存
    identityService      *IdentityService         // 身份识别
    concurrencyService   *ConcurrencyService      // 并发控制
    deferredService      *DeferredService         // 延迟操作
    schedulerSnapshot    *SchedulerSnapshotService // 调度快照
    settingService       SettingService           // 系统设置
    
    // === Cache & HTTP ===
    cache                GatewayCache            // 网关缓存(粘性会话)
    digestStore          DigestSessionStore      // 会话存储
    httpUpstream         HTTPUpstream            // 上游 HTTP 客户端
}

4.2 账号选择结果

type AccountSelectionResult struct {
    Account     *Account           // 选中的账号
    Acquired    bool               // 是否成功获取槽位
    ReleaseFunc func()             // 释放槽位函数
    WaitPlan    *AccountWaitPlan   // 等待计划(如果 Acquired=false
}

type AccountWaitPlan struct {
    AccountID      int64       // 账号ID
    MaxConcurrency int         // 最大并发
    Timeout        time.Duration  // 超时时间
    MaxWaiting     int         // 最大等待数
}

4.3 用量记录输入

type RecordUsageInput struct {
    Result        *ForwardResult    // 转发结果
    APIKey        *APIKey           // API Key
    User          *User              // 用户
    Account       *Account           // 上游账号
    Subscription  *Subscription      // 订阅(如果有)
    RequestID     string             // 请求ID
}

## 5. 性能与优化

### 5.1 性能瓶颈分析

| 瓶颈点 | 影响 | 优化建议 |
|-------|------|----------|
| 账号选择锁 | 高并发下争用 | 使用读写锁或本地缓存 |
| 重试延迟 | 响应时间增加 | 自适应退避算法 |
| 日志写入 | I/O 延迟 | 批量异步写入 |

### 5.2 GatewayService 核心方法

| 方法 | 位置 | 职责 |
|------|------|------|
| `HandleRequest` | gateway_service.go | 处理 Claude /v1/messages 请求 |
| `HandleOpenAIRequest` | openai_gateway_service.go | 处理 OpenAI 格式请求 |
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | 负载感知账号选择 |
| `RecordUsage` | gateway_service.go:7483 | 用量记录与计费 |
| `tryAcquireAccountSlot` | gateway_service.go | 尝试获取并发槽位 |
| `checkAndRegisterSession` | gateway_service.go | 检查会话限制 |

### 5.3 缓存策略

- **API Key 缓存**两级缓存L1 内存 + L2 Redis
- **账号状态缓存**本地内存缓存5 秒过期
- **配额缓存**Redis 缓存,实时更新
- **粘性会话缓存**RedisTTL 1 小时
- **调度快照缓存**:定期快照加速选择

### 5.4 连接池管理

```go
// gateway_service.go - HTTPUpstream 接口
type HTTPUpstream interface {
    Do(req *http.Request) (*http.Response, error)
    // 每个账号维护独立的 HTTP 客户端
}

连接管理策略:

  • 每个上游账号维护独立的 HTTP 客户端
  • 使用连接池复用连接
  • 超时配置:连接 10s读取 120s
  • 支持 WebSocketSSE长连接

6. 安全考虑

6.1 请求验证

  • 模型白名单:验证请求模型是否在分组允许列表中
  • IP 限制:支持 API Key 级别的 IP 白名单
  • 请求限流:基于用户/分组的 RPM/TPM 限制

6.2 上游保护

  • URL 白名单:限制可访问的上游域名
  • Header 过滤:移除敏感的上游响应头
  • 超时控制:防止上游响应超时

7. 可扩展性设计

7.1 新增上游支持

要支持新的 AI 提供商,需要:

  1. 实现账号类型account.go

    type AccountType string
    const (
        AccountTypeOAuth    AccountType = "oauth"
        AccountTypeAPIKey   AccountType = "apikey"
        AccountTypeAnthropic AccountType = "anthropic"
        // 新增类型
        AccountTypeCustom  AccountType = "custom"
    )
    
  2. 实现请求转换器service/request_transformer.go

    • 将标准请求格式转换为目标平台格式
    • 处理认证、模型映射等
  3. 注册到网关server/routes/gateway.go

    router.POST("/custom/v1/*path", middleware.APIKeyAuth, customHandler)
    

7.2 插件化设计

当前架构支持:

  • 错误处理插件:通过 ErrorPassthroughRule 配置
  • 请求/响应拦截:通过 Hook 机制
  • 自定义认证:支持 OAuth、API Key 等多种方式

8. 测试覆盖

8.1 单元测试

测试文件 覆盖范围
gateway_account_selection_test.go 账号选择算法
gateway_handler_stream_failover_test.go 流式响应故障转移
gateway_helper_backoff_test.go 退避算法

8.2 集成测试

测试文件 场景
e2e_gateway_test.go 完整请求流程
e2e_user_flow_test.go 用户使用场景

9. 配置参数

9.1 网关配置config.yaml

gateway:
  # 重试配置
  max_retries: 3
  retry_delay_ms: 100
  max_retry_delay_ms: 8000
  
  # 超时配置
  request_timeout: 120s
  connect_timeout: 10s
  
  # 粘性会话
  sticky_session_ttl: 3600
  
  # 流式响应
  stream_buffer_size: 32768

9.2 环境变量

变量 说明 默认值
GATEWAY_MAX_RETRIES 最大重试次数 3
GATEWAY_REQUEST_TIMEOUT 请求超时 120s
GATEWAY_STICKY_SESSION 启用粘性会话 true

10. 监控与运维

10.1 关键指标

指标 告警阈值 说明
gateway_request_duration > 30s 请求延迟
gateway_account_failover > 10% 账号切换率
gateway_upstream_errors > 1% 上游错误率

10.2 日志分析

关键日志字段:

  • request_id:请求唯一标识
  • account_id:使用的账号 ID
  • upstream_status:上游响应状态
  • retry_count:重试次数
  • failover_reason:故障切换原因

11. 修改和扩展指南

11.1 常见修改场景

场景 1修改重试策略

修改文件:service/gateway_service.go

func (s *GatewayService) handleUpstreamError(...) {
    // 修改重试次数
    maxRetries := 5 // 从 3 改为 5
    
    // 修改退避算法
    delay := calculateExponentialBackoff(attempt) // 改为指数退避
}

场景 2添加新的上游支持

  1. domain/constants.go 添加账号类型
  2. 实现账号创建/验证逻辑
  3. 添加请求转发函数
  4. 注册路由

场景 3调整账号选择算法

修改文件:service/gateway_service.go

func (s *GatewayService) selectAccountByStrategy(...) {
    // 可以添加更多选择策略
    switch strategy {
    case "least_load":
        return selectByLeastLoad(accounts)
    case "round_robin":
        return selectByRoundRobin(accounts)
    case "priority":
        return selectByPriority(accounts)
    }
}

11.2 注意事项

  1. 线程安全:账号选择逻辑需要考虑并发安全
  2. 向后兼容:新增配置需要设置合理的默认值
  3. 测试覆盖:重大修改需要添加对应的单元测试

12. 总结

API Gateway 是 Sub2API 系统的核心模块,设计具有良好的灵活性和可扩展性。关键特点:

  • 多平台支持:统一接口接入多种 AI 提供商
  • 智能调度:负载感知 + 粘性会话
  • 故障容忍:自动重试 + 账号切换
  • 高性能:多级缓存 + 连接池复用

修改建议:

  • 重试策略调整相对简单,风险较低
  • 新增上游支持需要完善测试
  • 账号选择算法优化需充分验证

文档版本1.1 最后更新2026-03-23 分析基于Sub2API v0.1.104 修正内容文件路径、账号选择算法6步流程、GatewayService依赖服务