- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
16 KiB
Sub2API 模块分析报告:API Gateway 核心
1. 模块概述
1.1 模块定位
API Gateway 是 Sub2API 的核心模块,负责接收用户请求、选择合适的上游账号、转发请求到 AI 提供商(如 Anthropic、OpenAI、Google Gemini 等),并处理响应。该模块是整个系统流量入口和转发核心。
1.2 核心职责
- 请求接收与路由:接收来自用户的 AI API 请求
- 账号选择:根据负载、可用性、优先级选择合适的上游账号
- 请求转发:将请求转发到上游 AI 服务
- 故障处理:账号故障时的自动重试和切换
- 响应返回:将上游响应返回给用户
2. 代码结构分析
2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---|---|---|
handler/gateway_handler.go |
Claude /v1/messages 主入口 | ~1800 行 |
handler/openai_chat_completions.go |
OpenAI /v1/chat/completions 处理器 | ~2500 行 |
handler/gemini_v1beta_handler.go |
Google Gemini /v1beta/* 处理器 | ~600 行 |
handler/sora_gateway_handler.go |
Sora 视频生成处理器 | ~500 行 |
service/gateway_service.go |
核心网关逻辑,账号选择、重试、8516 行 | 8516 行 |
service/openai_gateway_service.go |
OpenAI 特定处理(WebSocket、流式) | ~4000 行 |
service/antigravity_gateway_service.go |
Antigravity 平台支持 | ~1500 行 |
2.2 目录结构
backend/internal/
├── handler/
│ ├── gateway_handler.go # Claude /v1/messages 主入口
│ ├── openai_chat_completions.go # OpenAI /v1/chat/completions
│ ├── openai_gateway_handler.go # OpenAI 兼容接口(含 SSE、WebSocket)
│ ├── gemini_v1beta_handler.go # Google Gemini /v1beta/*
│ ├── sora_gateway_handler.go # Sora 视频生成
│ └── endpoint.go # 端点辅助函数
└── service/
├── gateway_service.go # 核心网关服务(8516行)
├── openai_gateway_service.go # OpenAI 特定处理
├── openai_ws_v2/ # OpenAI WebSocket v2 支持
├── antigravity_gateway_service.go # Antigravity 平台
├── openai_account_scheduler.go # OpenAI 账号调度器
└── gateway_anthropic_apikey_passthrough.go # API Key 透传
3. 功能详细分析
3.1 请求处理流程
用户请求 → 认证中间件 → 网关处理器 → 账号选择 → 请求转发 → 响应处理 → 返回结果
关键步骤:
-
请求认证 (
api_key_auth.go)- 验证 API Key 有效性
- 获取用户和分组信息
- 检查配额和限流
-
账号选择 (
gateway_service.go:SelectAccountWithLoadAwareness)- 检查分组下的可用账号
- 根据负载因子排序
- 支持粘性会话(同一会话路由到同一账号)
-
请求转发 (
forwardRequest)- 构建上游请求
- 添加必要的认证头
- 处理流式/非流式响应
-
故障处理 (
handleUpstreamError)- 单账号重试(同一账号重试)
- 账号切换(切换到其他账号)
- 退避策略(线性退避)
3.2 账号选择算法
// gateway_service.go - SelectAccountWithLoadAwareness (实际实现)
func (s *GatewayService) SelectAccountWithLoadAwareness(
ctx context.Context,
groupID *int64,
sessionHash string,
requestedModel string,
excludedIDs map[int64]struct{},
metadataUserID string, // 已废弃参数
) (*AccountSelectionResult, error) {
// 1. 检查 Claude Code 限制(可能会替换 groupID 为降级分组)
group, groupID, err := s.checkClaudeCodeRestriction(ctx, groupID)
if err != nil {
return nil, err
}
ctx = s.withGroupContext(ctx, group)
// 2. 获取粘性会话绑定的账号
var stickyAccountID int64
if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil {
stickyAccountID = accountID
}
// 3. 尝试获取账号槽位
for {
account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, localExcluded)
if err != nil {
return nil, err
}
// 3.1 尝试获取并发槽位
result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err == nil && result.Acquired {
// 3.2 检查会话限制
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
result.ReleaseFunc()
localExcluded[account.ID] = struct{}{}
continue
}
return &AccountSelectionResult{Account: account, Acquired: true, ...}, nil
}
// 3.3 支持等待计划(WaitPlan)
// ...
}
}
实际流程(6 步):
-
Claude Code 限制检查 (
checkClaudeCodeRestriction)- 检测 Claude Code 客户端
- 可能替换 groupID 为降级分组
-
粘性会话获取 (
cache.GetSessionAccountID)- 从 Redis 获取 sessionHash 绑定的账号
- 用于保持同一会话路由到同一账号
-
并发槽位获取 (
tryAcquireAccountSlot)- 检查账号并发限制
- 使用
ConcurrencyService管理槽位
-
会话限制检查 (
checkAndRegisterSession)- 检查每个账号的会话数上限
- 支持等待队列
-
等待计划 (WaitPlan)
- 如果账号繁忙,返回等待计划
- 客户端可以等待重试
-
混合调度 (
selectAccountWithMixedScheduling)- Anthropic/Gemini 分组支持混合调度
- 包含启用了 mixed_scheduling 的 Antigravity 账号
关键特性:
- 负载感知:根据并发槽位和负载因子选择
- 粘性会话:基于 sessionHash 保持会话
- 会话限制:每个账号有最大会话数限制
- 等待计划:支持队列等待而非直接拒绝
- Claude Code 适配:自动降级处理
3.3 故障重试策略
// gateway_service.go - handleUpstreamError
func handleUpstreamError(...) *handleResult {
// 1. 同账号重试(短延迟)
if canRetrySameAccount(statusCode) {
return retryWithShortDelay(account)
}
// 2. 切换账号(稍长延迟)
if hasMoreAccounts(groupID) {
return switchToNextAccount(groupID, failedAccount)
}
// 3. 线性退避
return retryWithLinearBackoff(attemptNum)
}
重试参数(可配置):
- 单账号重试次数:默认 1 次
- 账号切换次数:默认 3 次
- 重试延迟:100ms ~ 8000ms(线性增长)
3.4 支持的上游平台
| 平台 | 端点 | 认证方式 | 特性 |
|---|---|---|---|
| Anthropic Claude | /v1/messages |
OAuth/API Key | 流式响应,消息历史 |
| OpenAI | /v1/chat/completions |
API Key | 多模型支持 |
| Google Gemini | /v1beta/models/:generateContent |
API Key | 多模态支持 |
| Antigravity | /v1/messages, /v1beta/ |
OAuth | 独立配额系统 |
| AWS Bedrock | /model invocation |
AWS 签名 | Claude on Bedrock |
4. 关键数据结构
4.1 GatewayService 依赖的服务
// gateway_service.go - GatewayService 结构体
type GatewayService struct {
// === Repositories ===
accountRepo AccountRepository // 账号数据访问
groupRepo GroupRepository // 分组数据访问
usageLogRepo UsageLogRepository // 用量日志
usageBillingRepo UsageBillingRepository // 用量计费
userRepo UserRepository // 用户数据
userSubRepo UserSubscriptionRepository // 订阅数据
userGroupRateRepo UserGroupRateRepository // 用户分组费率
// === Services ===
billingService *BillingService // 计费服务
rateLimitService *RateLimitService // 限流服务
billingCacheService *BillingCacheService // 计费缓存
identityService *IdentityService // 身份识别
concurrencyService *ConcurrencyService // 并发控制
deferredService *DeferredService // 延迟操作
schedulerSnapshot *SchedulerSnapshotService // 调度快照
settingService SettingService // 系统设置
// === Cache & HTTP ===
cache GatewayCache // 网关缓存(粘性会话)
digestStore DigestSessionStore // 会话存储
httpUpstream HTTPUpstream // 上游 HTTP 客户端
}
4.2 账号选择结果
type AccountSelectionResult struct {
Account *Account // 选中的账号
Acquired bool // 是否成功获取槽位
ReleaseFunc func() // 释放槽位函数
WaitPlan *AccountWaitPlan // 等待计划(如果 Acquired=false)
}
type AccountWaitPlan struct {
AccountID int64 // 账号ID
MaxConcurrency int // 最大并发
Timeout time.Duration // 超时时间
MaxWaiting int // 最大等待数
}
4.3 用量记录输入
type RecordUsageInput struct {
Result *ForwardResult // 转发结果
APIKey *APIKey // API Key
User *User // 用户
Account *Account // 上游账号
Subscription *Subscription // 订阅(如果有)
RequestID string // 请求ID
}
## 5. 性能与优化
### 5.1 性能瓶颈分析
| 瓶颈点 | 影响 | 优化建议 |
|-------|------|----------|
| 账号选择锁 | 高并发下争用 | 使用读写锁或本地缓存 |
| 重试延迟 | 响应时间增加 | 自适应退避算法 |
| 日志写入 | I/O 延迟 | 批量异步写入 |
### 5.2 GatewayService 核心方法
| 方法 | 位置 | 职责 |
|------|------|------|
| `HandleRequest` | gateway_service.go | 处理 Claude /v1/messages 请求 |
| `HandleOpenAIRequest` | openai_gateway_service.go | 处理 OpenAI 格式请求 |
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | 负载感知账号选择 |
| `RecordUsage` | gateway_service.go:7483 | 用量记录与计费 |
| `tryAcquireAccountSlot` | gateway_service.go | 尝试获取并发槽位 |
| `checkAndRegisterSession` | gateway_service.go | 检查会话限制 |
### 5.3 缓存策略
- **API Key 缓存**:两级缓存(L1 内存 + L2 Redis)
- **账号状态缓存**:本地内存缓存,5 秒过期
- **配额缓存**:Redis 缓存,实时更新
- **粘性会话缓存**:Redis,TTL 1 小时
- **调度快照缓存**:定期快照加速选择
### 5.4 连接池管理
```go
// gateway_service.go - HTTPUpstream 接口
type HTTPUpstream interface {
Do(req *http.Request) (*http.Response, error)
// 每个账号维护独立的 HTTP 客户端
}
连接管理策略:
- 每个上游账号维护独立的 HTTP 客户端
- 使用连接池复用连接
- 超时配置:连接 10s,读取 120s
- 支持 WebSocket(SSE)长连接
6. 安全考虑
6.1 请求验证
- 模型白名单:验证请求模型是否在分组允许列表中
- IP 限制:支持 API Key 级别的 IP 白名单
- 请求限流:基于用户/分组的 RPM/TPM 限制
6.2 上游保护
- URL 白名单:限制可访问的上游域名
- Header 过滤:移除敏感的上游响应头
- 超时控制:防止上游响应超时
7. 可扩展性设计
7.1 新增上游支持
要支持新的 AI 提供商,需要:
-
实现账号类型(
account.go)type AccountType string const ( AccountTypeOAuth AccountType = "oauth" AccountTypeAPIKey AccountType = "apikey" AccountTypeAnthropic AccountType = "anthropic" // 新增类型 AccountTypeCustom AccountType = "custom" ) -
实现请求转换器(
service/request_transformer.go)- 将标准请求格式转换为目标平台格式
- 处理认证、模型映射等
-
注册到网关(
server/routes/gateway.go)router.POST("/custom/v1/*path", middleware.APIKeyAuth, customHandler)
7.2 插件化设计
当前架构支持:
- 错误处理插件:通过
ErrorPassthroughRule配置 - 请求/响应拦截:通过 Hook 机制
- 自定义认证:支持 OAuth、API Key 等多种方式
8. 测试覆盖
8.1 单元测试
| 测试文件 | 覆盖范围 |
|---|---|
gateway_account_selection_test.go |
账号选择算法 |
gateway_handler_stream_failover_test.go |
流式响应故障转移 |
gateway_helper_backoff_test.go |
退避算法 |
8.2 集成测试
| 测试文件 | 场景 |
|---|---|
e2e_gateway_test.go |
完整请求流程 |
e2e_user_flow_test.go |
用户使用场景 |
9. 配置参数
9.1 网关配置(config.yaml)
gateway:
# 重试配置
max_retries: 3
retry_delay_ms: 100
max_retry_delay_ms: 8000
# 超时配置
request_timeout: 120s
connect_timeout: 10s
# 粘性会话
sticky_session_ttl: 3600
# 流式响应
stream_buffer_size: 32768
9.2 环境变量
| 变量 | 说明 | 默认值 |
|---|---|---|
GATEWAY_MAX_RETRIES |
最大重试次数 | 3 |
GATEWAY_REQUEST_TIMEOUT |
请求超时 | 120s |
GATEWAY_STICKY_SESSION |
启用粘性会话 | true |
10. 监控与运维
10.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
gateway_request_duration |
> 30s | 请求延迟 |
gateway_account_failover |
> 10% | 账号切换率 |
gateway_upstream_errors |
> 1% | 上游错误率 |
10.2 日志分析
关键日志字段:
request_id:请求唯一标识account_id:使用的账号 IDupstream_status:上游响应状态retry_count:重试次数failover_reason:故障切换原因
11. 修改和扩展指南
11.1 常见修改场景
场景 1:修改重试策略
修改文件:service/gateway_service.go
func (s *GatewayService) handleUpstreamError(...) {
// 修改重试次数
maxRetries := 5 // 从 3 改为 5
// 修改退避算法
delay := calculateExponentialBackoff(attempt) // 改为指数退避
}
场景 2:添加新的上游支持
- 在
domain/constants.go添加账号类型 - 实现账号创建/验证逻辑
- 添加请求转发函数
- 注册路由
场景 3:调整账号选择算法
修改文件:service/gateway_service.go
func (s *GatewayService) selectAccountByStrategy(...) {
// 可以添加更多选择策略
switch strategy {
case "least_load":
return selectByLeastLoad(accounts)
case "round_robin":
return selectByRoundRobin(accounts)
case "priority":
return selectByPriority(accounts)
}
}
11.2 注意事项
- 线程安全:账号选择逻辑需要考虑并发安全
- 向后兼容:新增配置需要设置合理的默认值
- 测试覆盖:重大修改需要添加对应的单元测试
12. 总结
API Gateway 是 Sub2API 系统的核心模块,设计具有良好的灵活性和可扩展性。关键特点:
- 多平台支持:统一接口接入多种 AI 提供商
- 智能调度:负载感知 + 粘性会话
- 故障容忍:自动重试 + 账号切换
- 高性能:多级缓存 + 连接池复用
修改建议:
- 重试策略调整相对简单,风险较低
- 新增上游支持需要完善测试
- 账号选择算法优化需充分验证
文档版本:1.1 最后更新:2026-03-23 分析基于:Sub2API v0.1.104 修正内容:文件路径、账号选择算法(6步流程)、GatewayService依赖服务