Files
lijiaoqiao/docs/test_plan_design_v1_2026-03-18.md
2026-03-26 20:06:14 +08:00

1365 lines
37 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 测试方案设计
> 版本v1.0
> 日期2026-03-18
> 依据testing skill 最佳实践
---
## 1. 测试策略概述
### 1.1 测试金字塔
```
╱╲
╲ E2E Tests (10%)
╱──────╲
╲ Integration Tests (20%)
╱──────────╲
╱────────────╲ Unit Tests (70%)
╱────────────────╲
```
### 1.2 测试目标
| 指标 | 目标 | 说明 |
|------|------|------|
| 代码覆盖率 | >= 80% | 核心业务 |
| 单元测试通过率 | 100% | 必须通过 |
| 集成测试通过率 | 100% | 必须通过 |
| E2E测试通过率 | 95% | 允许5% flaky |
| 构建门禁 | 100% | CI必须通过 |
---
## 2. 单元测试
### 2.1 测试框架
```python
# pytest.ini
[pytest]
testpaths = tests/unit
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--strict-markers
--tb=short
--cov=llm_gateway
--cov-report=term-missing
--cov-report=html
markers =
unit: Unit tests
integration: Integration tests
e2e: End-to-end tests
slow: Slow running tests
```
### 2.2 单元测试示例
```python
# tests/unit/service/test_billing.py
import pytest
from decimal import Decimal
from unittest.mock import Mock, patch
from llm_gateway.service.billing import BillingService
from llm_gateway.service.repository import BillingRepository
from llm_gateway.service.balance import BalanceManager
class TestBillingService:
"""计费服务单元测试"""
@pytest.fixture
def billing_service(self):
"""Fixture: 计费服务实例"""
repo = Mock(spec=BillingRepository)
balance_mgr = Mock(spec=BalanceManager)
return BillingService(repo, balance_mgr)
def test_estimate_cost_gpt4(self, billing_service):
"""测试GPT-4成本估算"""
# Arrange
request = Mock()
request.Model = "gpt-4"
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
# Act
cost = billing_service.EstimateCost(request)
# Assert
assert cost.Amount > 0
assert cost.Currency == "USD"
def test_estimate_cost_gpt35(self, billing_service):
"""测试GPT-3.5成本估算"""
# Arrange
request = Mock()
request.Model = "gpt-3.5-turbo"
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
# Act
cost = billing_service.EstimateCost(request)
# Assert
# GPT-3.5应该比GPT-4便宜
gpt4_cost = billing_service.EstimateCost(self._create_request("gpt-4"))
assert cost.Amount < gpt4_cost.Amount
@pytest.mark.parametrize("model,expected_tokens", [
("gpt-4", 100),
("gpt-3.5-turbo", 50),
("claude-3-opus", 150),
])
def test_estimate_cost_models(self, billing_service, model, expected_tokens):
"""参数化测试:不同模型成本估算"""
request = self._create_request(model)
cost = billing_service.EstimateCost(request)
assert cost.Amount > 0
def _create_request(self, model):
"""创建测试请求"""
request = Mock()
request.Model = model
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
return request
def test_insufficient_balance(self, billing_service):
"""测试余额不足场景"""
# Arrange
billing_service.balance_mgr.Reserve.return_value = None
billing_service.balance_mgr.ErrInsufficientBalance = Exception()
request = self._create_request("gpt-4")
# Act & Assert
with pytest.raises(Exception) as exc_info:
billing_service.ProcessRequest(request)
assert "insufficient" in str(exc_info.value).lower()
def test_process_request_success(self, billing_service):
"""测试成功处理请求"""
# Arrange
billing_service.balanceMgr.Reserve.return_value = Mock(Amount=Decimal("0.10"))
billing_service.balanceMgr.Charge.return_value = None
billing_service.repo.Create.return_value = None
request = self._create_request("gpt-4")
request.Response = Mock()
request.Response.Usage.PromptTokens = 500
request.Response.Usage.CompletionTokens = 500
request.ID = "req-123"
# Act
record = billing_service.ProcessRequest(request)
# Assert
assert record is not None
assert record.UserID == request.UserID
billing_service.balanceMgr.Reserve.assert_called_once()
billing_service.repo.Create.assert_called_once()
```
### 2.3 Router服务测试
```python
# tests/unit/service/test_router.py
import pytest
from unittest.mock import Mock, AsyncMock
from llm_gateway.service.router import RouterService
from llm_gateway.internal.adapter import Registry, Provider
class TestRouterService:
"""路由服务单元测试"""
@pytest.fixture
def mock_provider(self):
"""Mock供应商"""
provider = Mock(spec=Provider)
provider.Name.return_value = "openai"
provider.HealthCheck.return_value = None
provider.Call.return_value = Mock(
id="resp-123",
choices=[Mock(delta=Mock(content="Hello"))],
usage=Mock(prompt_tokens=10, completion_tokens=5)
)
return provider
@pytest.fixture
def router_service(self, mock_provider):
"""Fixture: 路由服务实例"""
registry = Mock(spec=Registry)
registry.GetAvailableProviders.return_value = [mock_provider]
registry.Get.return_value = mock_provider
return RouterService(registry)
@pytest.mark.asyncio
async def test_route_success(self, router_service, mock_provider):
"""测试成功路由"""
# Arrange
request = Mock()
request.Model = "gpt-4"
request.UserID = 1
request.TenantID = 1
request.Messages = []
# Act
response = await router_service.Route(request)
# Assert
assert response is not None
mock_provider.Call.assert_called_once()
@pytest.mark.asyncio
async def test_route_no_provider(self, router_service):
"""测试无可用供应商"""
# Arrange
router_service.adapterRegistry.GetAvailableProviders.return_value = []
request = Mock()
request.Model = "gpt-4"
# Act & Assert
with pytest.raises(Exception) as exc_info:
await router_service.Route(request)
assert "no provider" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_route_fallback_on_error(self, router_service, mock_provider):
"""测试失败时降级"""
# Arrange
mock_provider.Call.side_effect = [Exception("API Error"), Mock()]
request = Mock()
request.Model = "gpt-4"
# Act
response = await router_service.Route(request)
# Assert
assert response is not None
assert mock_provider.Call.call_count == 2 # 重试一次
```
---
## 3. 集成测试
### 3.1 测试夹具
```python
# tests/conftest.py
import pytest
import asyncio
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from httpx import AsyncClient
from llm_gateway.main import app
from llm_gateway.database import Base
@pytest.fixture(scope="session")
def event_loop():
"""创建事件循环"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
def db_engine():
"""测试数据库引擎"""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
@pytest.fixture(scope="function")
def db_session(db_engine):
"""测试数据库会话"""
Session = sessionmaker(bind=db_engine)
session = Session()
yield session
session.close()
@pytest.fixture(scope="function")
async def client():
"""测试客户端"""
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture
def test_user(db_session):
"""创建测试用户"""
user = User(
email="test@example.com",
password_hash="hashed_password",
name="Test User"
)
db_session.add(user)
db_session.commit()
return user
```
### 3.2 API集成测试
```python
# tests/integration/api/test_chat.py
import pytest
from httpx import AsyncClient
class TestChatAPI:
"""聊天API集成测试"""
@pytest.mark.asyncio
async def test_chat_completions_success(self, client: AsyncClient, test_user):
"""测试成功创建聊天完成"""
# Arrange
token = await self._get_token(client, test_user)
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"}
]
},
headers={"Authorization": f"Bearer {token}"}
)
# Assert
assert response.status_code == 200
data = response.json()
assert "choices" in data
assert len(data["choices"]) > 0
@pytest.mark.asyncio
async def test_chat_completions_unauthorized(self, client: AsyncClient):
"""测试未授权访问"""
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"}
]
}
)
# Assert
assert response.status_code == 401
@pytest.mark.asyncio
async def test_chat_completions_invalid_model(self, client: AsyncClient, test_user):
"""测试无效模型"""
# Arrange
token = await self._get_token(client, test_user)
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "invalid-model",
"messages": [
{"role": "user", "content": "Hello"}
]
},
headers={"Authorization": f"Bearer {token}"}
)
# Assert
assert response.status_code == 400
assert "model" in response.json()["error"]["code"].lower()
async def _get_token(self, client, user):
"""获取测试令牌"""
response = await client.post(
"/v1/auth/token",
json={
"email": user.email,
"password": "test_password"
}
)
return response.json()["access_token"]
```
---
## 4. 契约测试
### 4.1 Provider契约测试
```python
# tests/contract/test_provider_adapter.py
import pytest
from llm_gateway.internal.adapter import ProviderAdapter
from llm_gateway.service.adapter import OpenAIAdapter
class TestProviderContract:
"""供应商适配器契约测试"""
@pytest.fixture
def adapter(self):
"""适配器实例"""
return OpenAIAdapter(api_key="test-key")
@pytest.mark.asyncio
async def test_response_structure(self, adapter):
"""测试响应结构符合契约"""
# Act
response = await adapter.chat_completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello"}]
)
# Assert - 验证必需字段
assert hasattr(response, 'id')
assert hasattr(response, 'model')
assert hasattr(response, 'choices')
assert hasattr(response, 'usage')
assert response.usage.prompt_tokens >= 0
assert response.usage.completion_tokens >= 0
assert response.usage.total_tokens >= 0
@pytest.mark.asyncio
async def test_error_mapping(self, adapter):
"""测试错误码映射"""
# 测试各种错误情况
test_cases = [
(Exception("invalid_api_key"), "INVALID_KEY"),
(Exception("rate_limit_exceeded"), "RATE_LIMIT"),
(Exception("insufficient_quota"), "INSUFFICIENT_QUOTA"),
]
for original_error, expected_code in test_cases:
result = adapter.map_error(original_error)
assert result.code == expected_code
@pytest.mark.asyncio
async def test_streaming(self, adapter):
"""测试流式响应"""
# Act
response = await adapter.chat_completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True
)
# Assert
chunks = []
async for chunk in response.stream():
chunks.append(chunk)
if len(chunks) >= 5:
break
assert len(chunks) > 0
assert all(hasattr(c, 'delta') for c in chunks)
```
### 4.2 契约漂移检测
```yaml
# .github/workflows/contract-test.yml
name: Contract Tests
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
contract-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest pytest-asyncio pact
- name: Run contract tests
run: |
pytest tests/contract/ -v --contract=true
- name: Publish contract
if: github.ref == 'refs/heads/main'
run: |
pact-broker publish \
pactDir=./pacts \
brokerUrl=${{ secrets.PACT_BROKER_URL }} \
brokerToken=${{ secrets.PACT_BROKER_TOKEN }}
```
---
## 5. E2E测试
### 5.1 Playwright E2E测试
```python
# tests/e2e/test_user_journey.py
import pytest
from playwright.async_api import async_playwright
class TestUserJourney:
"""用户旅程E2E测试"""
@pytest.fixture
async def browser_context(self):
"""浏览器上下文"""
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context()
yield context
await context.close()
await browser.close()
@pytest.mark.asyncio
async def test_complete_user_flow(self, browser_context):
"""测试完整用户流程"""
page = await browser_context.new_page()
# 1. 注册
await page.goto("https://app.lgateway.com/register")
await page.fill("[name=email]", "user@example.com")
await page.fill("[name=password]", "SecurePassword123!")
await page.click("button[type=submit]")
await page.wait_for_selector(".dashboard")
# 2. 创建API Key
await page.click("text=API Keys")
await page.click("text=Create Key")
await page.fill("[name=description]", "Test Key")
await page.click("button:has-text('Create')")
api_key = await page.text_content(".api-key")
# 3. 测试API调用
response = await self._call_api(api_key, {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}]
})
assert response.status == 200
# 4. 查看使用量
await page.click("text=Usage")
await page.wait_for_selector(".usage-chart")
# 5. 检查账单
await page.click("text=Billing")
await page.wait_for_selector(".balance")
async def _call_api(self, api_key, payload):
"""调用API"""
import httpx
async with httpx.AsyncClient() as client:
return await client.post(
"https://api.lgateway.com/v1/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {api_key}"}
)
```
---
## 6. 性能测试
### 6.1 负载测试
```python
# tests/performance/test_load.py
import pytest
import asyncio
import time
from locust import HttpUser, task, between
class LLMGatewayUser(HttpUser):
"""Locust负载测试用户"""
wait_time = between(0.5, 2)
def on_start(self):
"""初始化"""
response = self.client.post("/v1/auth/token", json={
"email": "test@example.com",
"password": "password"
})
self.token = response.json()["access_token"]
@task(10)
def chat_completion(self):
"""聊天完成请求"""
self.client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}]
},
headers={"Authorization": f"Bearer {self.token}"}
)
@task(1)
def list_models(self):
"""列出模型"""
self.client.get(
"/v1/models",
headers={"Authorization": f"Bearer {self.token}"}
)
```
### 6.2 性能基准
```yaml
# k6/performance.js
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
stages: [
{ duration: '2m', target: 100 }, // 2分钟内增加到100用户
{ duration: '5m', target: 100 }, // 保持100用户5分钟
{ duration: '2m', target: 200 }, // 增加到200用户
{ duration: '5m', target: 200 }, // 保持200用户5分钟
{ duration: '2m', target: 0 }, // 降到0
],
thresholds: {
http_req_duration: ['p(95)<500'], // P95 < 500ms
http_req_failed: ['rate<0.01'], // 失败率 < 1%
},
};
export default function () {
const payload = JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: 'Hello' }]
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${__ENV.API_KEY}`,
},
};
const res = http.post('https://api.lgateway.com/v1/chat/completions', payload, params);
check(res, { 'status was 200': (r) => r.status === 200 });
sleep(1);
}
```
---
## 7. 测试覆盖率目标
### 7.1 覆盖率矩阵
| 模块 | 目标覆盖率 | 关键测试 |
|------|-----------|----------|
| Router Service | 90% | 路由选择、fallback |
| Billing Service | 85% | 计费、扣款、退款 |
| Auth Service | 80% | 认证、授权 |
| Adapter | 85% | 供应商调用、错误处理 |
| Middleware | 75% | 限流、日志 |
| API Handlers | 70% | 请求验证、响应格式化 |
---
## 8. CI/CD集成
### 8.1 GitHub Actions
```yaml
# .github/workflows/test.yml
name: Test Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
unit-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- run: pip install -r requirements-test.txt
- run: pytest tests/unit/ -v --cov
integration-test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- run: pip install -r requirements-test.txt
- run: pytest tests/integration/ -v
contract-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: pytest tests/contract/ -v --contract
e2e-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: '18'
- run: npm install
- run: npx playwright install
- run: npx playwright test
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: snyk/actions/python@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
```
---
## 9. 混沌工程测试
### 9.1 故障注入策略
```python
# tests/chaos/test_fault_injection.py
import pytest
from chaos.engine import ChaosEngine
class TestChaosEngineering:
"""混沌工程测试 - 验证系统韧性"""
@pytest.fixture
def chaos(self):
"""混沌引擎"""
return ChaosEngine()
@pytest.mark.asyncio
async def test_provider_timeout_handling(self, chaos):
"""测试供应商超时处理"""
# 注入:供应商响应超时
await chaos.inject_latency(
target="provider:openai",
delay=30 # 30秒延迟
)
# 验证:系统触发降级
response = await router.route(request)
assert response.fallback_triggered
assert response.fallback_provider == "anthropic"
@pytest.mark.asyncio
async def test_circuit_breaker_open(self, chaos):
"""测试断路器打开"""
# 注入:连续失败
await chaos.inject_errors(
target="provider:azure",
count=10,
error_type="connection"
)
# 验证:断路器打开
cb_state = await chaos.get_circuit_state("azure")
assert cb_state == "OPEN"
@pytest.mark.asyncio
async def test_network_partition(self, chaos):
"""测试网络分区"""
# 注入:网络分区
await chaos.network_partition(
source="gateway",
target="billing",
drop_packets=0.5
)
# 验证:异步计费
billing = await router.route(request)
assert billing.async_processed
@pytest.mark.asyncio
async def test_database_failure(self, chaos):
"""测试数据库故障"""
# 注入:主库故障
await chaos.failover_database(
from_primary=True
)
# 验证:自动切换到从库
db_state = await get_database_state()
assert db_state.active == "replica"
assert db_state.data_consistent
```
### 9.2 韧性验证场景
| 场景 | 注入故障 | 预期行为 |
|------|----------|----------|
| 单Provider宕机 | kill provider进程 | 自动切换到备选Provider |
| Redis不可用 | 网络隔离 | 降级到本地限流 |
| 数据库故障 | 主库不可用 | 自动切换从库,写入延迟处理 |
| 流量突增 | 10倍QPS | 限流生效,无雪崩 |
| 依赖服务超时 | 注入超时 | 快速失败,不阻塞 |
---
## 10. 安全测试
### 10.1 OWASP Top 10 防护测试
```python
# tests/security/test_owasp.py
import pytest
from security.scanner import VulnerabilityScanner
class TestSecurityVulnerabilities:
"""安全漏洞测试"""
def test_sql_injection_prevention(self):
"""测试SQL注入防护"""
# 恶意输入
malicious_inputs = [
"' OR '1'='1",
"'; DROP TABLE users;--",
"1' UNION SELECT * FROM passwords--"
]
for payload in malicious_inputs:
response = api.get(f"/users?name={payload}")
assert response.status_code == 400
assert "injection" not in response.text.lower()
def test_api_key_exposure(self):
"""测试API Key泄露检测"""
# 模拟响应包含敏感信息
response = api.get("/v1/models")
assert api_key not in response.text
assert not any(k in response.headers for k in ['X-API-Key', 'Authorization'])
def test_rate_limiting_bypass(self):
"""测试限流绕过防护"""
# 尝试绕过限流
for i in range(150):
response = api.post("/v1/chat/completions", data)
if i >= 100:
assert response.status_code == 429
def test_privilege_escalation(self):
"""测试权限提升防护"""
# 普通用户尝试访问管理员API
response = api_admin.delete("/admin/users/1")
assert response.status_code == 403
def test_cors_misconfiguration(self):
"""测试CORS配置"""
response = api.options("/api/v1/")
assert "Access-Control-Allow-Origin" in response.headers
# 验证不允许任意Origin
assert response.headers.get("Access-Control-Allow-Origin") != "*"
```
### 10.2 密钥轮换测试
```python
# tests/security/test_key_rotation.py
class TestKeyRotation:
"""密钥轮换测试"""
def test_automatic_key_rotation(self):
"""测试自动密钥轮换"""
# 1. 触发轮换
rotation_service.trigger_rotation()
# 2. 验证新密钥生效
new_key = key_manager.get_active_key()
assert new_key.version > old_key.version
assert new_key.is_active
# 3. 验证旧密钥过期
assert not old_key.is_active
# 验证有过渡期
assert old_key.expires_at > now
def test_key_rotation_graceful(self):
"""测试轮换期间服务不中断"""
# 模拟轮换期间的请求
requests = [api_request() for _ in range(100)]
results = parallel_execute(requests)
# 验证所有请求成功(使用旧密钥或新密钥)
assert all(r.success for r in results)
```
### 10.3 日志脱敏测试
```python
# tests/security/test_log_redaction.py
class TestLogRedaction:
"""日志脱敏测试"""
def test_sensitive_data_redaction(self):
"""测试敏感数据脱敏"""
# 记录包含敏感信息的日志
logger.info(f"User {user_id} payment: {credit_card}")
# 验证日志已脱敏
log_entry = get_latest_log()
assert credit_card not in log_entry.message
assert "****" in log_entry.message # 脱敏后格式
assert "4" in log_entry.message # 保留后4位
def test_pii_detection(self):
"""测试PII检测"""
pii_data = [
"13812345678", # 手机号
"user@example.com", # 邮箱
"610102199001011234", # 身份证
]
for pii in pii_data:
logger.info(f"User data: {pii}")
log = get_latest_log()
assert pii not in log.message
```
---
## 11. 可观测性测试
### 11.1 指标验证测试
```python
# tests/observability/test_metrics.py
class TestMetricsEmission:
"""指标发射测试"""
def test_request_latency_histogram(self):
"""测试请求延迟直方图"""
# 发送请求
response = api.post("/v1/chat/completions", request_data)
# 验证指标
metrics = prometheus.get_metrics("http_request_duration_seconds")
assert metrics.labels["method"] == "POST"
assert metrics.labels["status"] == "200"
assert metrics.value > 0
def test_billing_amount_gauge(self):
"""测试计费金额仪表"""
# 执行计费
billing.charge(user_id, amount)
# 验证指标
metrics = prometheus.get_metrics("billing_charged_amount")
assert metrics.labels["currency"] == "USD"
assert metrics.value == amount
def test_provider_failure_counter(self):
"""测试供应商失败计数"""
# 触发失败
for _ in range(5):
try:
provider.call(request)
except Exception:
pass
# 验证计数器
counter = prometheus.get_metrics("provider_calls_total")
assert counter.labels["status"] == "error"
assert counter.value >= 5
```
### 11.2 链路追踪验证
```python
# tests/observability/test_tracing.py
class TestDistributedTracing:
"""分布式追踪测试"""
def test_trace_context_propagation(self):
"""测试Trace上下文传播"""
# 发起请求
response = api.post("/v1/chat/completions", request)
# 验证TraceID
trace_id = response.headers["X-Trace-ID"]
spans = jaeger.get_spans(trace_id)
# 验证链路完整
assert len(spans) >= 4 # gateway -> router -> adapter -> provider
assert all(s.parent_id in [s.id for s in spans] for s in spans)
def test_span_attributes(self):
"""测试Span属性完整"""
spans = jaeger.get_spans(trace_id)
for span in spans:
assert span.name
assert span.service_name
assert span.start_time
assert span.duration > 0
# 验证关键属性
if span.name == "provider.call":
assert span.attributes["provider"]
assert span.attributes["model"]
```
### 11.3 告警触发验证
```python
# tests/observability/test_alerts.py
class TestAlerting:
"""告警测试"""
def test_high_latency_alert(self):
"""测试高延迟告警"""
# 注入高延迟
for _ in range(10):
await provider.call(delay=5)
# 验证告警
alert = alert_manager.get_latest_alert()
assert alert.name == "HighLatencyP99"
assert alert.severity == "P1"
def test_low_balance_alert(self):
"""测试低余额告警"""
# 设置低余额
balance.set_balance(user_id, 10)
# 触发检查
await balance.check_threshold()
# 验证告警
alert = alert_manager.get_latest_alert()
assert alert.name == "LowBalance"
assert user_id in alert.targets
---
## 12. 测试数据管理
### 12.1 测试数据工厂
```python
# tests/fixtures/factories.py
import factory
from datetime import datetime
class UserFactory(factory.Factory):
"""用户测试数据工厂"""
class Meta:
model = dict
user_id = factory.Sequence(lambda n: 10000 + n)
email = factory.LazyAttribute(lambda o: f"user{o.user_id}@test.com")
name = factory.Faker("name")
tier = "growth"
balance = factory.Faker("pydecimal", left_digits=5, right_digits=2)
created_at = factory.LazyFunction(datetime.now)
class APIKeyFactory(factory.Factory):
"""API Key测试数据工厂"""
class Meta:
model = dict
key_id = factory.Sequence(lambda n: f"sk-test-{n:08d}")
user_id = factory.SubFactory(UserFactory)
name = "Test Key"
quota = 10000
rate_limit = 1000
is_active = True
created_at = factory.LazyFunction(datetime.now)
class ProviderFactory(factory.Factory):
"""Provider测试数据工厂"""
class Meta:
model = dict
provider_id = factory.Sequence(lambda n: n)
name = factory.Iterator(["openai", "anthropic", "azure", "google"])
api_base = "https://api.example.com"
latency_p99 = factory.Faker("pyint", min_value=50, max_value=500)
availability = factory.Faker("pyfloat", min_value=0.95, max_value=1.0)
cost_per_1k = factory.Faker("pyfloat", min_value=0.5, max_value=10.0)
```
### 12.2 测试数据隔离
```python
# tests/conftest.py
import pytest
from tests.fixtures.database import TestDatabase
@pytest.fixture(scope="session")
def test_db():
"""测试数据库会话级fixture"""
db = TestDatabase()
db.init(schema="tests/fixtures/schema.sql")
yield db
db.cleanup()
@pytest.fixture
def clean_user(test_db):
"""每个测试前清理用户数据"""
test_db.execute("DELETE FROM users WHERE email LIKE '%@test.com'")
yield
test_db.execute("DELETE FROM users WHERE email LIKE '%@test.com'")
@pytest.fixture
def isolated_balance(test_db):
"""隔离的余额测试"""
# 每个测试使用独立账户
account_id = test_db.create_test_account()
test_db.set_balance(account_id, 10000)
yield account_id
test_db.cleanup_account(account_id)
```
### 12.3 测试数据版本管理
```yaml
# tests/data/version.yaml
# 测试数据版本管理
version: "1.0"
datasets:
user_tier_free:
count: 100
balance_range: [0, 100]
tier: free
user_tier_growth:
count: 50
balance_range: [100, 10000]
tier: growth
user_tier_enterprise:
count: 10
balance_range: [10000, 100000]
tier: enterprise
provider_active:
- name: openai
models: [gpt-4, gpt-3.5-turbo]
status: active
- name: anthropic
models: [claude-3-opus, claude-3-sonnet]
status: active
```
---
## 13. 部署验证测试
### 13.1 环境一致性验证
```python
# tests/deployment/test_environment.py
class TestEnvironmentConsistency:
"""环境一致性验证"""
def test_environment_variables(self):
"""验证环境变量配置"""
required_vars = [
"DATABASE_URL",
"REDIS_URL",
"KAFKA_BROKERS",
"LOG_LEVEL",
]
for var in required_vars:
assert os.environ.get(var), f"Missing env var: {var}"
def test_database_schema_version(self):
"""验证数据库schema版本"""
# 获取当前版本
current_version = db.get_schema_version()
# 获取期望版本
expected_version = get_code_schema_version()
assert current_version == expected_version, \
f"Schema mismatch: db={current_version}, code={expected_version}"
def test_dependencies_installed(self):
"""验证依赖包版本"""
import pkg_resources
requirements = open("requirements.txt").read()
for req in pkg_resources.parse_requirements(requirements):
try:
installed = pkg_resources.get_distribution(req.project_name)
assert str(installed.version) in str(req.specifier)
except Exception as e:
pytest.fail(f"Dependency issue: {req}, error: {e}")
```
### 13.2 健康检查验证
```python
# tests/deployment/test_health.py
class TestHealthChecks:
"""健康检查验证"""
def test_gateway_health(self):
"""测试网关健康"""
response = requests.get("http://localhost:8080/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "version" in data
def test_service_dependencies(self):
"""测试服务依赖"""
response = requests.get("http://localhost:8080/health/ready")
data = response.json()
# 验证所有依赖健康
assert data["dependencies"]["database"]["status"] == "up"
assert data["dependencies"]["redis"]["status"] == "up"
assert data["dependencies"]["kafka"]["status"] == "up"
def test_startup_probe(self):
"""测试启动探针"""
# 模拟服务启动
start_time = time.time()
while time.time() - start_time < 30:
try:
response = requests.get("http://localhost:8080/health")
if response.status_code == 200:
break
except Exception:
pass
time.sleep(1)
# 验证30秒内启动完成
assert time.time() - start_time < 30
```
### 13.3 配置验证
```python
# tests/deployment/test_config.py
class TestConfigurationValidation:
"""配置验证测试"""
def test_secret_rotation_config(self):
"""验证密钥轮换配置"""
config = get_config()
assert config.rotation_enabled is True
assert config.rotation_interval_days == 90
assert config.grace_period_hours == 24
def test_rate_limit_config(self):
"""验证限流配置"""
config = get_config()
assert config.rate_limit.global_limit == 100000
assert config.rate_limit.tenant_limit == 10000
assert config.rate_limit.apikey_limit == 1000
def test_circuit_breaker_config(self):
"""验证断路器配置"""
config = get_config()
assert config.circuit_breaker.failure_threshold == 5
assert config.circuit_breaker.timeout_seconds == 60
assert config.circuit_breaker.half_open_max_calls == 3
```
### 13.4 金丝雀部署验证
```python
# tests/deployment/test_canary.py
class TestCanaryDeployment:
"""金丝雀部署验证"""
def test_canary_routing(self):
"""测试金丝雀路由"""
# 发送流量到新版本
for i in range(100):
response = api.post("/v1/chat/completions", request)
# 验证10%流量到新版本
metrics = get_canary_metrics()
assert 0.05 < metrics.canary_percentage < 0.15
def test_canary_error_rate(self):
"""测试金丝雀错误率"""
errors = get_canary_errors()
assert errors.new_version_error_rate < 0.01
assert errors.new_version_error_rate < errors.old_version_error_rate * 2
def test_rollback_on_failure(self):
"""测试失败自动回滚"""
# 注入失败
inject_failure("canary", error_rate=0.5)
# 等待检测和回滚
time.sleep(60)
# 验证已回滚
version = get_current_version()
assert version == "stable"
### 9.1 与技术架构一致性
| 测试项 | 对应模块 | 验证点 |
|--------|----------|--------|
| Provider Adapter测试 | `technical_architecture.md` | 契约符合 |
| 路由策略测试 | `technical_architecture.md` | 选择算法 |
| 计费精度测试 | `business_solution_v1.md` | Decimal精度 |
| 限流测试 | `p1_optimization_solution_v1.md` | 多维度 |
| 风控测试 | `security_solution_v1.md` | 规则执行 |
---
**文档状态**:测试方案设计
**关联文档**
- `technical_architecture_design_v1_2026-03-18.md`
- `architecture_solution_v1_2026-03-18.md`
- `security_solution_v1_2026-03-18.md`