Files
tokens-reef/deploy/docs-backup/MODULE_01_API_GATEWAY.md

487 lines
16 KiB
Markdown
Raw Permalink Normal View History

# Sub2API 模块分析报告API Gateway 核心
## 1. 模块概述
### 1.1 模块定位
API Gateway 是 Sub2API 的核心模块,负责接收用户请求、选择合适的上游账号、转发请求到 AI 提供商(如 Anthropic、OpenAI、Google Gemini 等),并处理响应。该模块是整个系统流量入口和转发核心。
### 1.2 核心职责
- **请求接收与路由**:接收来自用户的 AI API 请求
- **账号选择**:根据负载、可用性、优先级选择合适的上游账号
- **请求转发**:将请求转发到上游 AI 服务
- **故障处理**:账号故障时的自动重试和切换
- **响应返回**:将上游响应返回给用户
## 2. 代码结构分析
### 2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---------|------|----------|
| `handler/gateway_handler.go` | Claude /v1/messages 主入口 | ~1800 行 |
| `handler/openai_chat_completions.go` | OpenAI /v1/chat/completions 处理器 | ~2500 行 |
| `handler/gemini_v1beta_handler.go` | Google Gemini /v1beta/* 处理器 | ~600 行 |
| `handler/sora_gateway_handler.go` | Sora 视频生成处理器 | ~500 行 |
| `service/gateway_service.go` | 核心网关逻辑,账号选择、重试、**8516 行** | 8516 行 |
| `service/openai_gateway_service.go` | OpenAI 特定处理WebSocket、流式 | ~4000 行 |
| `service/antigravity_gateway_service.go` | Antigravity 平台支持 | ~1500 行 |
### 2.2 目录结构
```
backend/internal/
├── handler/
│ ├── gateway_handler.go # Claude /v1/messages 主入口
│ ├── openai_chat_completions.go # OpenAI /v1/chat/completions
│ ├── openai_gateway_handler.go # OpenAI 兼容接口(含 SSE、WebSocket
│ ├── gemini_v1beta_handler.go # Google Gemini /v1beta/*
│ ├── sora_gateway_handler.go # Sora 视频生成
│ └── endpoint.go # 端点辅助函数
└── service/
├── gateway_service.go # 核心网关服务8516行
├── openai_gateway_service.go # OpenAI 特定处理
├── openai_ws_v2/ # OpenAI WebSocket v2 支持
├── antigravity_gateway_service.go # Antigravity 平台
├── openai_account_scheduler.go # OpenAI 账号调度器
└── gateway_anthropic_apikey_passthrough.go # API Key 透传
```
## 3. 功能详细分析
### 3.1 请求处理流程
```
用户请求 → 认证中间件 → 网关处理器 → 账号选择 → 请求转发 → 响应处理 → 返回结果
```
**关键步骤:**
1. **请求认证** (`api_key_auth.go`)
- 验证 API Key 有效性
- 获取用户和分组信息
- 检查配额和限流
2. **账号选择** (`gateway_service.go:SelectAccountWithLoadAwareness`)
- 检查分组下的可用账号
- 根据负载因子排序
- 支持粘性会话(同一会话路由到同一账号)
3. **请求转发** (`forwardRequest`)
- 构建上游请求
- 添加必要的认证头
- 处理流式/非流式响应
4. **故障处理** (`handleUpstreamError`)
- 单账号重试(同一账号重试)
- 账号切换(切换到其他账号)
- 退避策略(线性退避)
### 3.2 账号选择算法
```go
// gateway_service.go - SelectAccountWithLoadAwareness (实际实现)
func (s *GatewayService) SelectAccountWithLoadAwareness(
ctx context.Context,
groupID *int64,
sessionHash string,
requestedModel string,
excludedIDs map[int64]struct{},
metadataUserID string, // 已废弃参数
) (*AccountSelectionResult, error) {
// 1. 检查 Claude Code 限制(可能会替换 groupID 为降级分组)
group, groupID, err := s.checkClaudeCodeRestriction(ctx, groupID)
if err != nil {
return nil, err
}
ctx = s.withGroupContext(ctx, group)
// 2. 获取粘性会话绑定的账号
var stickyAccountID int64
if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil {
stickyAccountID = accountID
}
// 3. 尝试获取账号槽位
for {
account, err := s.SelectAccountForModelWithExclusions(ctx, groupID, sessionHash, requestedModel, localExcluded)
if err != nil {
return nil, err
}
// 3.1 尝试获取并发槽位
result, err := s.tryAcquireAccountSlot(ctx, account.ID, account.Concurrency)
if err == nil && result.Acquired {
// 3.2 检查会话限制
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
result.ReleaseFunc()
localExcluded[account.ID] = struct{}{}
continue
}
return &AccountSelectionResult{Account: account, Acquired: true, ...}, nil
}
// 3.3 支持等待计划WaitPlan
// ...
}
}
```
**实际流程6 步):**
1. **Claude Code 限制检查** (`checkClaudeCodeRestriction`)
- 检测 Claude Code 客户端
- 可能替换 groupID 为降级分组
2. **粘性会话获取** (`cache.GetSessionAccountID`)
- 从 Redis 获取 sessionHash 绑定的账号
- 用于保持同一会话路由到同一账号
3. **并发槽位获取** (`tryAcquireAccountSlot`)
- 检查账号并发限制
- 使用 `ConcurrencyService` 管理槽位
4. **会话限制检查** (`checkAndRegisterSession`)
- 检查每个账号的会话数上限
- 支持等待队列
5. **等待计划 (WaitPlan)**
- 如果账号繁忙,返回等待计划
- 客户端可以等待重试
6. **混合调度** (`selectAccountWithMixedScheduling`)
- Anthropic/Gemini 分组支持混合调度
- 包含启用了 mixed_scheduling 的 Antigravity 账号
**关键特性:**
- **负载感知**:根据并发槽位和负载因子选择
- **粘性会话**:基于 sessionHash 保持会话
- **会话限制**:每个账号有最大会话数限制
- **等待计划**:支持队列等待而非直接拒绝
- **Claude Code 适配**:自动降级处理
### 3.3 故障重试策略
```go
// gateway_service.go - handleUpstreamError
func handleUpstreamError(...) *handleResult {
// 1. 同账号重试(短延迟)
if canRetrySameAccount(statusCode) {
return retryWithShortDelay(account)
}
// 2. 切换账号(稍长延迟)
if hasMoreAccounts(groupID) {
return switchToNextAccount(groupID, failedAccount)
}
// 3. 线性退避
return retryWithLinearBackoff(attemptNum)
}
```
**重试参数(可配置):**
- 单账号重试次数:默认 1 次
- 账号切换次数:默认 3 次
- 重试延迟100ms ~ 8000ms线性增长
### 3.4 支持的上游平台
| 平台 | 端点 | 认证方式 | 特性 |
|------|------|----------|------|
| **Anthropic Claude** | `/v1/messages` | OAuth/API Key | 流式响应,消息历史 |
| **OpenAI** | `/v1/chat/completions` | API Key | 多模型支持 |
| **Google Gemini** | `/v1beta/models/:generateContent` | API Key | 多模态支持 |
| **Antigravity** | `/v1/messages`, `/v1beta/` | OAuth | 独立配额系统 |
| **AWS Bedrock** | `/model invocation` | AWS 签名 | Claude on Bedrock |
## 4. 关键数据结构
### 4.1 GatewayService 依赖的服务
```go
// gateway_service.go - GatewayService 结构体
type GatewayService struct {
// === Repositories ===
accountRepo AccountRepository // 账号数据访问
groupRepo GroupRepository // 分组数据访问
usageLogRepo UsageLogRepository // 用量日志
usageBillingRepo UsageBillingRepository // 用量计费
userRepo UserRepository // 用户数据
userSubRepo UserSubscriptionRepository // 订阅数据
userGroupRateRepo UserGroupRateRepository // 用户分组费率
// === Services ===
billingService *BillingService // 计费服务
rateLimitService *RateLimitService // 限流服务
billingCacheService *BillingCacheService // 计费缓存
identityService *IdentityService // 身份识别
concurrencyService *ConcurrencyService // 并发控制
deferredService *DeferredService // 延迟操作
schedulerSnapshot *SchedulerSnapshotService // 调度快照
settingService SettingService // 系统设置
// === Cache & HTTP ===
cache GatewayCache // 网关缓存(粘性会话)
digestStore DigestSessionStore // 会话存储
httpUpstream HTTPUpstream // 上游 HTTP 客户端
}
```
### 4.2 账号选择结果
```go
type AccountSelectionResult struct {
Account *Account // 选中的账号
Acquired bool // 是否成功获取槽位
ReleaseFunc func() // 释放槽位函数
WaitPlan *AccountWaitPlan // 等待计划(如果 Acquired=false
}
type AccountWaitPlan struct {
AccountID int64 // 账号ID
MaxConcurrency int // 最大并发
Timeout time.Duration // 超时时间
MaxWaiting int // 最大等待数
}
```
### 4.3 用量记录输入
```go
type RecordUsageInput struct {
Result *ForwardResult // 转发结果
APIKey *APIKey // API Key
User *User // 用户
Account *Account // 上游账号
Subscription *Subscription // 订阅(如果有)
RequestID string // 请求ID
}
```
```
## 5. 性能与优化
### 5.1 性能瓶颈分析
| 瓶颈点 | 影响 | 优化建议 |
|-------|------|----------|
| 账号选择锁 | 高并发下争用 | 使用读写锁或本地缓存 |
| 重试延迟 | 响应时间增加 | 自适应退避算法 |
| 日志写入 | I/O 延迟 | 批量异步写入 |
### 5.2 GatewayService 核心方法
| 方法 | 位置 | 职责 |
|------|------|------|
| `HandleRequest` | gateway_service.go | 处理 Claude /v1/messages 请求 |
| `HandleOpenAIRequest` | openai_gateway_service.go | 处理 OpenAI 格式请求 |
| `SelectAccountWithLoadAwareness` | gateway_service.go:1190 | 负载感知账号选择 |
| `RecordUsage` | gateway_service.go:7483 | 用量记录与计费 |
| `tryAcquireAccountSlot` | gateway_service.go | 尝试获取并发槽位 |
| `checkAndRegisterSession` | gateway_service.go | 检查会话限制 |
### 5.3 缓存策略
- **API Key 缓存**两级缓存L1 内存 + L2 Redis
- **账号状态缓存**本地内存缓存5 秒过期
- **配额缓存**Redis 缓存,实时更新
- **粘性会话缓存**RedisTTL 1 小时
- **调度快照缓存**:定期快照加速选择
### 5.4 连接池管理
```go
// gateway_service.go - HTTPUpstream 接口
type HTTPUpstream interface {
Do(req *http.Request) (*http.Response, error)
// 每个账号维护独立的 HTTP 客户端
}
```
**连接管理策略:**
- 每个上游账号维护独立的 HTTP 客户端
- 使用连接池复用连接
- 超时配置:连接 10s读取 120s
- 支持 WebSocketSSE长连接
## 6. 安全考虑
### 6.1 请求验证
- **模型白名单**:验证请求模型是否在分组允许列表中
- **IP 限制**:支持 API Key 级别的 IP 白名单
- **请求限流**:基于用户/分组的 RPM/TPM 限制
### 6.2 上游保护
- **URL 白名单**:限制可访问的上游域名
- **Header 过滤**:移除敏感的上游响应头
- **超时控制**:防止上游响应超时
## 7. 可扩展性设计
### 7.1 新增上游支持
要支持新的 AI 提供商,需要:
1. **实现账号类型**`account.go`
```go
type AccountType string
const (
AccountTypeOAuth AccountType = "oauth"
AccountTypeAPIKey AccountType = "apikey"
AccountTypeAnthropic AccountType = "anthropic"
// 新增类型
AccountTypeCustom AccountType = "custom"
)
```
2. **实现请求转换器**`service/request_transformer.go`
- 将标准请求格式转换为目标平台格式
- 处理认证、模型映射等
3. **注册到网关**`server/routes/gateway.go`
```go
router.POST("/custom/v1/*path", middleware.APIKeyAuth, customHandler)
```
### 7.2 插件化设计
当前架构支持:
- **错误处理插件**:通过 `ErrorPassthroughRule` 配置
- **请求/响应拦截**:通过 Hook 机制
- **自定义认证**:支持 OAuth、API Key 等多种方式
## 8. 测试覆盖
### 8.1 单元测试
| 测试文件 | 覆盖范围 |
|----------|----------|
| `gateway_account_selection_test.go` | 账号选择算法 |
| `gateway_handler_stream_failover_test.go` | 流式响应故障转移 |
| `gateway_helper_backoff_test.go` | 退避算法 |
### 8.2 集成测试
| 测试文件 | 场景 |
|----------|------|
| `e2e_gateway_test.go` | 完整请求流程 |
| `e2e_user_flow_test.go` | 用户使用场景 |
## 9. 配置参数
### 9.1 网关配置config.yaml
```yaml
gateway:
# 重试配置
max_retries: 3
retry_delay_ms: 100
max_retry_delay_ms: 8000
# 超时配置
request_timeout: 120s
connect_timeout: 10s
# 粘性会话
sticky_session_ttl: 3600
# 流式响应
stream_buffer_size: 32768
```
### 9.2 环境变量
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `GATEWAY_MAX_RETRIES` | 最大重试次数 | 3 |
| `GATEWAY_REQUEST_TIMEOUT` | 请求超时 | 120s |
| `GATEWAY_STICKY_SESSION` | 启用粘性会话 | true |
## 10. 监控与运维
### 10.1 关键指标
| 指标 | 告警阈值 | 说明 |
|------|----------|------|
| `gateway_request_duration` | > 30s | 请求延迟 |
| `gateway_account_failover` | > 10% | 账号切换率 |
| `gateway_upstream_errors` | > 1% | 上游错误率 |
### 10.2 日志分析
关键日志字段:
- `request_id`:请求唯一标识
- `account_id`:使用的账号 ID
- `upstream_status`:上游响应状态
- `retry_count`:重试次数
- `failover_reason`:故障切换原因
## 11. 修改和扩展指南
### 11.1 常见修改场景
**场景 1修改重试策略**
修改文件:`service/gateway_service.go`
```go
func (s *GatewayService) handleUpstreamError(...) {
// 修改重试次数
maxRetries := 5 // 从 3 改为 5
// 修改退避算法
delay := calculateExponentialBackoff(attempt) // 改为指数退避
}
```
**场景 2添加新的上游支持**
1.`domain/constants.go` 添加账号类型
2. 实现账号创建/验证逻辑
3. 添加请求转发函数
4. 注册路由
**场景 3调整账号选择算法**
修改文件:`service/gateway_service.go`
```go
func (s *GatewayService) selectAccountByStrategy(...) {
// 可以添加更多选择策略
switch strategy {
case "least_load":
return selectByLeastLoad(accounts)
case "round_robin":
return selectByRoundRobin(accounts)
case "priority":
return selectByPriority(accounts)
}
}
```
### 11.2 注意事项
1. **线程安全**:账号选择逻辑需要考虑并发安全
2. **向后兼容**:新增配置需要设置合理的默认值
3. **测试覆盖**:重大修改需要添加对应的单元测试
## 12. 总结
API Gateway 是 Sub2API 系统的核心模块,设计具有良好的灵活性和可扩展性。关键特点:
- **多平台支持**:统一接口接入多种 AI 提供商
- **智能调度**:负载感知 + 粘性会话
- **故障容忍**:自动重试 + 账号切换
- **高性能**:多级缓存 + 连接池复用
修改建议:
- 重试策略调整相对简单,风险较低
- 新增上游支持需要完善测试
- 账号选择算法优化需充分验证
---
*文档版本1.1*
*最后更新2026-03-23*
*分析基于Sub2API v0.1.104*
*修正内容文件路径、账号选择算法6步流程、GatewayService依赖服务*