- Remove old review reports (keep latest only) - Move docs/ to deploy/docs-backup/ - Move performance-testing/ to deploy/ - Clean up test output files - Organize root directory
19 KiB
19 KiB
Sub2API 模块分析报告:计费与配额模块
1. 模块概述
1.1 模块定位
计费与配额模块是Sub2API系统的经济核心,负责追踪用户和API Key的使用量、计算费用、管理配额、控制访问。该模块与网关模块紧密配合,在每个请求完成后进行用量记录和费用扣除。
1.2 核心职责
- 用量追踪:记录每个请求的Token消耗和费用
- 配额控制:用户和API Key级别的配额限制
- 计费计算:根据模型和用量计算费用
- 余额管理:用户余额的充值和扣减
- 速率限制:RPM/TPM级别的请求限制
2. 代码结构分析
2.1 核心文件
| 文件路径 | 职责 | 代码行数 |
|---|---|---|
service/billing_service.go |
计费核心服务 | ~500行 |
service/billing_cache_service.go |
计费缓存服务(余额、配额检查) | ~700行 |
service/gateway_service.go |
用量记录(RecordUsage 在此文件中) | 8516行 |
repository/usage_log_repo.go |
用量数据访问层 | ~4000行 |
repository/usage_billing_repo.go |
计费数据访问层 | ~2000行 |
middleware/rate_limiter.go |
速率限制中间件 | ~400行 |
service/api_key_rate_limit.go |
API Key级别限流 | ~300行 |
⚠️ 重要修正:用量记录功能
RecordUsage位于gateway_service.go:7483,不是独立文件。
2.2 核心数据模型
// 用量日志 - 每次请求的详细记录
type UsageLog struct {
ID int64
RequestID string // 请求唯一ID
UserID int64 // 用户ID
APIKeyID *int64 // API Key ID
AccountID int64 // 上游账号ID
GroupID int64 // 分组ID
Model string // 使用模型
UpstreamModel string // 上游实际模型
RequestType string // 请求类型:chat/completion/embedding等
InputTokens int // 输入Token数
OutputTokens int // 输出Token数
TotalTokens int // 总Token数
Cost float64 // 费用
Currency string // 货币:USD
Status string // 状态:success/error
DurationMs int // 请求耗时(毫秒)
CreatedAt time.Time
}
// 计费配置 - 模型价格定义
type PricingRule struct {
Model string // 模型名称(支持通配符)
InputPrice float64 // 输入价格(per 1M tokens)
OutputPrice float64 // 输出价格(per 1M tokens)
PromptPrice float64 // 提示价格(per 1M tokens)
CacheDiscount float64 // 缓存读取折扣
}
3. 功能详细分析
3.1 用量记录流程(后置处理)
⚠️ 重要说明:用量记录发生在请求转发成功后,是后置处理步骤。
// gateway_service.go:7483 - RecordUsage
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
// 1. 强制缓存计费(粘性会话切换时)
if input.ForceCacheBilling && result.Usage.InputTokens > 0 {
result.Usage.CacheReadInputTokens += result.Usage.InputTokens
result.Usage.InputTokens = 0
}
// 2. 计算费用(根据媒体类型选择计费方式)
var cost *CostBreakdown
if result.MediaType == "image" || result.MediaType == "video" {
cost = s.billingService.CalculateSoraImageCost(...)
} else if result.ImageCount > 0 {
cost = s.billingService.CalculateImageCost(...)
} else {
// Token 计费
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
}
// 3. 创建用量日志(异步,不阻塞响应)
usageLog := &UsageLog{...}
go func() {
s.usageLogRepo.Create(context.Background(), usageLog)
}()
// 4. 异步计费扣费
billingCmd := &UsageBillingCommand{
UserID: user.ID,
APIKeyID: apiKey.ID,
Cost: cost.TotalCost,
RequestID: input.RequestID,
BillingFingerprint: fingerprint,
}
go func() {
s.usageBillingRepo.Apply(context.Background(), billingCmd)
}()
return nil
}
调用时机:RecordUsage 在 handler 层请求成功返回后调用:
// gateway_handler.go
if err := h.gatewayService.RecordUsage(ctx, &service.RecordUsageInput{...}); err != nil {
logger.Error("record usage failed", ...); // 不阻塞响应
}
特点:
- 后置处理:请求成功后才记录,不影响响应延迟
- 异步执行:日志和计费都异步执行
- 幂等设计:使用 BillingFingerprint 防止重复计费
3.2 计费计算逻辑
⚠️ 修正:计费通过
UsageBillingRepository.Apply()原子执行,不是简单的CalculateCost。
// gateway_service.go - RecordUsage 中的计费流程
func (s *GatewayService) RecordUsage(ctx context.Context, input *RecordUsageInput) error {
// 1. 计算费用
var cost *CostBreakdown
if result.MediaType == "image" {
cost = s.billingService.CalculateSoraImageCost(...)
} else {
cost, err = s.billingService.CalculateCost(billingModel, tokens, multiplier)
}
// 2. 构建计费命令
billingCmd := &UsageBillingCommand{
UserID: user.ID,
APIKeyID: apiKey.ID,
Cost: cost.TotalCost,
RequestID: input.RequestID,
// 幂等指纹,防止重复计费
BillingFingerprint: generateFingerprint(...),
}
// 3. 异步执行原子扣费
go func() {
result := s.usageBillingRepo.Apply(context.Background(), billingCmd)
if result.Applied {
logger.Info("billing_applied", "user_id", userID, "cost", cost)
}
}()
return nil
}
// repository/usage_billing_repo.go - Apply (原子操作)
func (r *usageBillingRepository) Apply(ctx context.Context, cmd *UsageBillingCommand) (*UsageBillingResult) {
// 使用事务确保原子性
tx, err := r.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
return &UsageBillingResult{Applied: false}
}
// 1. 检查幂等(防止重复计费)
if r.existsByFingerprint(ctx, cmd.BillingFingerprint) {
return &UsageBillingResult{Applied: false, Reason: "duplicate"}
}
// 2. 扣减余额
if err := tx.User.UpdateOneID(cmd.UserID).
AddBalance(-cmd.Cost).
Exec(ctx); err != nil {
tx.Rollback()
return &UsageBillingResult{Applied: false, Error: err}
}
// 3. 记录计费
r.createBillingRecord(ctx, tx, cmd)
// 4. 记录幂等指纹
r.createFingerprint(ctx, tx, cmd)
tx.Commit()
return &UsageBillingResult{Applied: true}
}
计费因子优先级:
费率倍数 = 系统默认值 (1.0)
或 分组配置 (group.RateMultiplier)
或 用户专属配置 (userGroupRateRepo)
Token 类型与计费:
标准输入 Token: InputTokens × InputPrice
标准输出 Token: OutputTokens × OutputPrice
缓存创建 (5分钟): CacheCreation5mTokens × CachePrice × 0.8
缓存创建 (1小时): CacheCreation1hTokens × CachePrice × 0.9
缓存读取: CacheReadInputTokens × CachePrice × 0.5
定价配置示例(config.yaml):
billing:
pricing:
claude-3-5-sonnet-20241022:
input: 3.00 # $3.00 / 1M tokens
output: 15.00 # $15.00 / 1M tokens
cache_read: 0.30 # 缓存读取折扣
3.3 配额控制
⚠️ 修正:配额检查分散在多个位置,发生在请求处理前(前置验证)。
3.3.1 实际配额检查流程
请求入口 (API Key Auth Middleware)
│
▼
┌─────────────────────────────────────────┐
│ api_key_auth.go: APIKeyAuth() │
│ │
│ 1. APIKeyService.GetByKey() │
│ → 验证 Key 有效性 │
│ │
│ 2. SubscriptionService.Validate() │
│ → 检查订阅状态和有效期 │
│ → 检查订阅配额 │
│ │
│ 3. BillingCacheService.CheckUserBalance()
│ → 检查用户余额 │
│ │
│ 4. APIKeyRateLimit.CheckRateLimits() │
│ → 检查 5h/1d/7d 限制 │
└─────────────────────────────────────────┘
│
▼
通过 → 继续请求
拒绝 → 返回错误 (402/429)
3.3.2 API Key 验证流程(详细)
// service/api_key_auth.go - APIKeyAuth()
func APIKeyAuth() gin.HandlerFunc {
return func(c *gin.Context) {
apiKey, user, err := apiKeyService.GetByKey(ctx, key)
if err != nil {
c.AbortWithStatusJSON(401, ...)
return
}
// 1. 订阅验证
if err := subscriptionService.Validate(ctx, user.ID, apiKey.GroupID); err != nil {
c.AbortWithStatusJSON(402, ...) // Payment Required
return
}
// 2. 余额检查
if err := billingCacheService.CheckUserBalance(ctx, user.ID); err != nil {
c.AbortWithStatusJSON(402, ...) // Payment Required
return
}
// 3. 速率限制检查
if err := apiKeyRateLimit.CheckRateLimits(ctx, apiKey.ID); err != nil {
c.AbortWithStatusJSON(429, ...) // Too Many Requests
return
}
// 4. 设置上下文
c.Set("api_key", apiKey)
c.Set("user", user)
c.Next()
}
}
3.3.3 BillingCacheService 余额检查
// service/billing_cache_service.go - CheckUserBalance
func (s *BillingCacheService) CheckUserBalance(ctx context.Context, userID int64) error {
// 1. 获取缓存的余额
balance, err := s.cache.GetUserBalance(ctx, userID)
if err != nil {
// 2. 缓存未命中,从数据库加载
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
return err
}
balance = user.Balance
s.cache.SetUserBalance(ctx, userID, balance)
}
// 3. 检查是否有足够余额(允许少量超支)
if balance < 0 {
return ErrInsufficientBalance
}
return nil
}
3.3.4 配额更新(异步)
// gateway_service.go - RecordUsage 中的配额更新
// 异步更新,不阻塞响应
go func() {
s.usageBillingRepo.Apply(ctx, &UsageBillingCommand{
UserID: userID,
APIKeyID: apiKeyID,
Cost: cost,
// ...
})
}()
3.4 速率限制
3.4.1 多级限流架构
┌─────────────────────────────────────┐
│ 第一级:API Key 限流 │
│ 检查:5h/1d/7d 累计使用量 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第二级:用户限流 │
│ 检查:RPM (requests per minute) │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 第三级:IP 限流 │
│ 检查:防止暴力请求 │
└─────────────────────────────────────┘
3.4.2 限流实现
// middleware/rate_limiter.go - AllowRequest
func (r *RateLimiter) AllowRequest(key string, limit int, window time.Duration) bool {
// 1. 获取当前计数
count := r.getCount(key, window)
// 2. 检查是否超限
if count >= int64(limit) {
return false
}
// 3. 原子递增
return r.incr(key, window)
}
// 实际检查逻辑
func (s *BillingCacheService) checkRateLimits(ctx context.Context, apiKey *APIKey) error {
// 检查 5h 限制
if apiKey.RateLimit5h > 0 {
used5h := s.getUsage5h(apiKey.ID)
if used5h >= apiKey.RateLimit5h {
return ErrRateLimitExceeded
}
}
// 检查 1d 限制
if apiKey.RateLimit1d > 0 {
used1d := s.getUsage1d(apiKey.ID)
if used1d >= apiKey.RateLimit1d {
return ErrRateLimitExceeded
}
}
return nil
}
3.5 余额管理
3.5.1 余额充值
// service/admin_service.go - RechargeBalance
func (s *AdminService) RechargeBalance(ctx context.Context, userID int64, amount float64, reason string) error {
// 1. 获取用户
user, err := s.userRepo.GetByID(ctx, userID)
if err != nil {
return err
}
// 2. 更新余额
newBalance := user.Balance + amount
err = s.userRepo.UpdateBalance(ctx, userID, amount)
if err != nil {
return err
}
// 3. 记录充值日志
s.recordTransaction(ctx, &Transaction{
UserID: userID,
Type: "recharge",
Amount: amount,
Balance: newBalance,
Reason: reason,
Operator: getCurrentUser(ctx),
})
return nil
}
3.5.2 余额扣减
// 在每次请求完成后自动扣减
func (s *BillingService) DeductBalance(ctx context.Context, userID int64, cost float64) error {
// 1. 原子操作扣减余额
err := s.userRepo.DeductBalance(ctx, userID, cost)
if err != nil {
// 余额不足,记录欠费
s.recordOverage(ctx, userID, cost)
return ErrInsufficientBalance
}
return nil
}
4. 缓存策略
4.1 多级缓存架构
// 计费缓存设计
type BillingCache struct {
// L1: 本地内存缓存
// - 用户余额缓存 (TTL: 10s)
// - API Key配额缓存 (TTL: 5s)
// - 用量计数缓存 (TTL: 1s)
// L2: Redis缓存
// - 实时用量统计
// - 速率限制计数
// L3: 数据库
// - 详细用量日志
// - 余额变动历史
}
4.2 缓存更新策略
// 用量记录更新策略
const (
CacheUpdateSync = "sync" // 同步更新
CacheUpdateAsync = "async" // 异步更新
CacheUpdateBatch = "batch" // 批量更新
)
// 当前策略:异步更新
func (s *BillingCacheService) QueueUpdateQuotaUsage(apiKeyID int64, cost float64) {
// 放入更新队列
s.updateQueue <- &QuotaUpdate{
APIKeyID: apiKeyID,
Cost: cost,
}
}
5. 配置参数
5.1 计费配置(config.yaml)
billing:
# 默认计费配置
default:
rate_multiplier: 1.0
# 价格配置(每百万Token价格,USD)
pricing:
claude-3-5-sonnet-20241022:
input: 3.00
output: 15.00
claude-3-haiku-20240307:
input: 0.25
output: 1.25
gpt-4o:
input: 2.50
output: 10.00
# 缓存配置
cache:
quota_ttl: 5s
balance_ttl: 10s
rate_limit_ttl: 1s
# 熔断配置
circuit_breaker:
enabled: true
error_threshold: 0.1
timeout: 30s
5.2 速率限制配置
rate_limit:
# API Key 默认限制
api_key:
rpm: 1000
tpm: 100000 # tokens per minute
# 用户默认限制
user:
rpm: 2000
# IP 默认限制
ip:
rpm: 5000
6. 修改和扩展指南
6.1 常见修改场景
场景1:调整模型价格
// service/billing_service.go - getPricingRule
func (s *BillingService) getPricingRule(model string, groupID int64) *PricingRule {
// 添加新模型定价
customPricing := map[string]PricingRule{
"gpt-4-turbo": {
InputPrice: 10.00,
OutputPrice: 30.00,
},
}
if rule, ok := customPricing[model]; ok {
return &rule
}
return s.defaultPricing[model]
}
场景2:调整API Key配额
// 修改默认配额
const (
DefaultRateLimit5h = 500000 // 从 100000 改为 500000
DefaultRateLimit1d = 2000000 // 从 500000 改为 2000000
)
场景3:添加新的计费维度
// 例如:按请求次数计费
type RequestPricing struct {
Model string
PerRequestCost float64 // 每次请求固定费用
PerTokenCost float64 // Token费用
}
func (s *BillingService) CalculateWithRequestFee(model string, tokens int) float64 {
pricing := s.getRequestPricing(model)
return pricing.PerRequestCost + (float64(tokens) / 1_000_000 * pricing.PerTokenCost)
}
6.2 注意事项
- 计费准确性:费用计算必须精确,建议保留更多小数位
- 并发安全:余额更新需要原子操作
- 数据一致性:缓存和数据库需要定期同步
7. 测试覆盖
7.1 单元测试
| 测试文件 | 覆盖范围 |
|---|---|
billing_service_test.go |
计费计算逻辑 |
billing_cache_service_test.go |
缓存机制 |
rate_limiter_test.go |
速率限制 |
7.2 集成测试
| 测试文件 | 场景 |
|---|---|
e2e_gateway_test.go |
完整计费流程 |
8. 监控与运维
8.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
billing_balance_zero |
> 10% | 余额为0用户比例 |
billing_quota_exhausted |
> 20% | 配额耗尽比例 |
billing_rate_limited |
> 15% | 触发限流比例 |
billing_cache_hit_rate |
< 90% | 缓存命中率 |
8.2 运维任务
| 任务 | 频率 | 说明 |
|---|---|---|
| 余额对账 | 每天 | 核对余额一致性 |
| 用量统计 | 每小时 | 生成统计报表 |
| 异常检测 | 持续 | 检测异常消费模式 |
9. 总结
计费与配额模块特点:
- 精细化计费:按模型、Token类型、用户分组等多维度计费
- 实时配额控制:多级限流保障系统稳定性
- 高性能缓存:两级缓存确保高并发下的性能
- 余额保护:支持少量超支,平衡用户体验和风险
潜在改进点:
- 可增加更灵活的计费规则(如包月套餐)
- 可增加更详细的费用分析报表
修改建议:
- 价格调整需要同步更新配置
- 限流参数可根据实际流量调整
文档版本:1.1 最后更新:2026-03-23 分析基于:Sub2API v0.1.104 修正内容:用量记录位置(gateway_service.go)、配额检查流程(前置验证)、计费原子操作