21 KiB
21 KiB
安全设计文档
概述
本文档描述用户管理系统的安全设计,包括数据加密、防攻击策略、合规性要求等。安全是系统的核心考量,所有设计均符合企业级安全标准。
安全架构
安全层次
┌─────────────────────────────────────────┐
│ 应用层安全 (Application) │
│ • 输入验证 • 输出编码 • 业务安全 │
├─────────────────────────────────────────┤
│ 服务层安全 (Service) │
│ • 认证授权 • 权限控制 • 数据过滤 │
├─────────────────────────────────────────┤
│ 网络层安全 (Network) │
│ • HTTPS/TLS • 防火墙 • 网络隔离 │
├─────────────────────────────────────────┤
│ 数据层安全 (Data) │
│ • 数据加密 • 访问控制 • 审计日志 │
└─────────────────────────────────────────┘
1. 数据加密
1.1 密码加密
加密算法选择
| 算法 | 推荐度 | 说明 |
|---|---|---|
| Argon2id | ⭐⭐⭐⭐⭐ | 最推荐,抗 GPU/ASIC 攻击 |
| bcrypt | ⭐⭐⭐⭐ | 成熟稳定,可配置成本因子 |
| PBKDF2 | ⭐⭐⭐ | NIST 认证,但性能较差 |
推荐配置(Argon2id)
argon2:
algorithm: argon2id
memory_cost: 65536 # 64 MB
time_cost: 3 # 迭代次数
parallelism: 4 # 并行线程
hash_length: 32 # Hash 长度
salt_length: 16 # 盐长度
加密流程
# Python 伪代码
import argon2
def hash_password(password: str) -> str:
# 生成随机盐
salt = os.urandom(16)
# 使用 Argon2id 加密
hasher = argon2.PasswordHasher(
time_cost=3,
memory_cost=65536,
parallelism=4,
hash_len=32,
salt_len=16
)
return hasher.hash(password)
def verify_password(password: str, hash: str) -> bool:
try:
hasher = argon2.PasswordHasher()
hasher.verify(hash, password)
return True
except:
return False
密码策略
| 规则 | 要求 | 说明 |
|---|---|---|
| 最小长度 | 8 字符 | 建议使用 12 字符以上 |
| 最大长度 | 32 字符 | 防止 DoS 攻击 |
| 大小写 | 至少 1 个 | - |
| 数字 | 至少 1 个 | - |
| 特殊字符 | 至少 1 个 | !@#$%^&*()_+-=[]{} |
| 常见密码 | 检查黑名单 | 禁止使用 123456、password 等 |
| 密码历史 | 检查最近 5 次 | 防止重复使用旧密码 |
1.2 敏感数据加密
加密数据范围
| 数据类型 | 加密方式 | 说明 |
|---|---|---|
| 手机号 | AES-256-GCM | 部分脱敏 + 加密存储 |
| 身份证号 | AES-256-GCM | 完全加密 |
| 银行卡号 | AES-256-GCM | 部分脱敏 + 加密存储 |
| 邮箱 | AES-256-GCM | 可选加密 |
| 私钥/密钥 | HSM | 硬件安全模块 |
AES-256-GCM 配置
encryption:
algorithm: AES-256-GCM
key_size: 256
key_rotation_days: 90
key_store: HSM # 或 KMS
加密实现(Go)
package security
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"io"
)
type AESEncryptor struct {
key []byte
}
func NewAESEncryptor(key string) *AESEncryptor {
return &AESEncryptor{key: []byte(key)}
}
func (e *AESEncryptor) Encrypt(plaintext string) (string, error) {
block, err := aes.NewCipher(e.key)
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonce := make([]byte, gcm.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", err
}
ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil)
return base64.StdEncoding.EncodeToString(ciphertext), nil
}
func (e *AESEncryptor) Decrypt(ciphertext string) (string, error) {
data, err := base64.StdEncoding.DecodeString(ciphertext)
if err != nil {
return "", err
}
block, err := aes.NewCipher(e.key)
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonceSize := gcm.NonceSize()
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
if err != nil {
return "", err
}
return string(plaintext), nil
}
1.3 Token 签名
JWT 签名算法
| 算法 | 安全性 | 说明 |
|---|---|---|
| RS256 | ⭐⭐⭐⭐⭐ | 推荐,非对称加密 |
| ES256 | ⭐⭐⭐⭐⭐ | 推荐,ECDSA 签名 |
| HS256 | ⭐⭐⭐ | 对称加密,密钥管理复杂 |
推荐配置(RS256)
jwt:
algorithm: RS256
access_token_expire: 7200 # 2 小时
refresh_token_expire: 2592000 # 30 天
key_rotation_days: 90
issuer: "user-management-system"
密钥管理
- 私钥存储:使用 HSM 或 KMS
- 公钥分发:通过 JWKS 端点
- 密钥轮换:每 90 天轮换一次
- 密钥备份:加密备份,多地存储
1.4 数据脱敏
脱敏规则
| 数据类型 | 显示格式 | 示例 |
|---|---|---|
| 手机号 | 138****8000 | 13800138000 → 138****8000 |
| 邮箱 | j***e@example.com | john@example.com → j***e@example.com |
| 身份证号 | 110***********1234 | 110101199001011234 → 110***********1234 |
| 银行卡号 | 6222********1234 | 6222020012345678 → 6222********1234 |
脱敏实现(Java)
public class DataMasking {
public static String maskPhone(String phone) {
if (phone == null || phone.length() < 11) {
return phone;
}
return phone.substring(0, 3) + "****" + phone.substring(7);
}
public static String maskEmail(String email) {
if (email == null || !email.contains("@")) {
return email;
}
String[] parts = email.split("@");
String local = parts[0];
String maskedLocal = local.substring(0, 1) + "***" +
local.substring(local.length() - 1);
return maskedLocal + "@" + parts[1];
}
public static String maskIdCard(String idCard) {
if (idCard == null || idCard.length() < 18) {
return idCard;
}
return idCard.substring(0, 3) + "***********" + idCard.substring(14);
}
}
2. 防攻击策略
2.1 SQL 注入防护
防护措施
- 使用参数化查询
-- 错误示例(SQL 注入风险)
SELECT * FROM users WHERE username = '" + username + "'
-- 正确示例(参数化查询)
SELECT * FROM users WHERE username = ?
- 使用 ORM 框架
# SQLAlchemy(Python)
user = session.query(User).filter(User.username == username).first()
# Hibernate(Java)
User user = session.createQuery(
"FROM User WHERE username = :username", User.class)
.setParameter("username", username)
.uniqueResult();
- 输入验证
import re
def validate_username(username: str) -> bool:
# 只允许字母、数字、下划线
pattern = r'^[a-zA-Z0-9_]{4,20}$'
return re.match(pattern, username) is not None
2.2 XSS 防护
防护措施
- 输出编码
<!-- 错误示例 -->
<div>{{ user_input }}</div>
<!-- 正确示例(HTML 编码) -->
<div>{{ user_input | escape }}</div>
- Content Security Policy (CSP)
Content-Security-Policy: default-src 'self';
script-src 'self' 'unsafe-inline' 'unsafe-eval';
style-src 'self' 'unsafe-inline';
img-src 'self' data: https:;
- 输入过滤
import html
def sanitize_input(input_str: str) -> str:
# 转义 HTML 特殊字符
return html.escape(input_str)
2.3 CSRF 防护
防护措施
- CSRF Token
# 生成 Token
def generate_csrf_token():
return secrets.token_urlsafe(32)
# 验证 Token
def verify_csrf_token(request):
token = request.headers.get('X-CSRF-Token')
session_token = session.get('csrf_token')
return token == session_token
- SameSite Cookie
Set-Cookie: session_id=xxx; SameSite=Strict; Secure; HttpOnly
- Origin 验证
def validate_origin(request):
allowed_origins = ['https://example.com']
origin = request.headers.get('Origin')
return origin in allowed_origins
2.4 接口防刷
限流策略
| 类型 | 算法 | 阈值 | 时间窗口 | 说明 |
|---|---|---|---|---|
| 登录接口 | 令牌桶 | 5 次/分钟 | 1 分钟 | 防止暴力破解 |
| 注册接口 | 漏桶 | 3 次/小时 | 1 小时 | 防止批量注册 |
| 验证码接口 | 固定窗口 | 1 次/分钟 | 1 分钟 | 防止验证码滥用 |
| API 接口(普通) | 滑动窗口 | 1000 次/分钟 | 1 分钟 | 普通用户限流 |
| API 接口(VIP) | 令牌桶 | 10000 次/分钟 | 1 分钟 | VIP 用户限流 |
| API 接口(IP) | 令牌桶 | 10000 次/分钟 | 1 分钟 | 单 IP 限流 |
分布式限流实现
package ratelimit
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// 令牌桶算法
type TokenBucket struct {
redis *redis.Client
capacity int64 // 桶容量
rate int64 // 令牌生成速率(tokens/秒)
}
func NewTokenBucket(redis *redis.Client, capacity, rate int64) *TokenBucket {
return &TokenBucket{
redis: redis,
capacity: capacity,
rate: rate,
}
}
// 尝试获取令牌
func (tb *TokenBucket) Allow(ctx context.Context, key string) (bool, error) {
now := time.Now().Unix()
windowStart := now - 1 // 1 秒时间窗口
pipe := tb.redis.Pipeline()
// 获取当前令牌数
tokensKey := fmt.Sprintf("rate_limit:tokens:%s", key)
tokensCmd := pipe.Get(ctx, tokensKey)
// 获取上次刷新时间
lastRefillKey := fmt.Sprintf("rate_limit:last_refill:%s", key)
lastRefillCmd := pipe.Get(ctx, lastRefillKey)
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return false, err
}
var tokens float64
if err := tokensCmd.Err(); err == nil {
tokens, _ = tokensCmd.Float64()
} else {
tokens = float64(tb.capacity)
}
var lastRefill int64
if err := lastRefillCmd.Err(); err == nil {
lastRefill, _ = lastRefillCmd.Int64()
} else {
lastRefill = now
}
// 计算需要补充的令牌
elapsedTime := now - lastRefill
refillTokens := float64(elapsedTime) * float64(tb.rate)
tokens += refillTokens
if tokens > float64(tb.capacity) {
tokens = float64(tb.capacity)
}
// 尝试消费一个令牌
if tokens >= 1 {
tokens -= 1
// 更新 Redis
pipe := tb.redis.Pipeline()
pipe.Set(ctx, tokensKey, tokens, 2*time.Second)
pipe.Set(ctx, lastRefillKey, now, 2*time.Second)
pipe.Exec(ctx)
return true, nil
}
return false, nil
}
Redis 限流实现
import redis
import time
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(self, key: str, limit: int, window: int) -> bool:
current = int(time.time())
window_start = current - window
pipe = self.redis.pipeline()
# 移除过期记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(current): current})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
return count < limit
2.5 密码暴力破解防护
防护措施
- 登录失败限制
security:
login:
max_attempts: 5
lockout_duration: 1800 # 30 分钟
progressive_delay: true
- 渐进式延迟
def calculate_lockout_time(attempts: int) -> int:
if attempts <= 3:
return 0
elif attempts == 4:
return 30 # 30 秒
elif attempts == 5:
return 300 # 5 分钟
else:
return 1800 # 30 分钟
- 验证码触发
def should_show_captcha(attempts: int) -> bool:
return attempts >= 3
2.6 中间人攻击防护
HTTPS 配置
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 其他安全头
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
}
3. 认证与授权安全
3.1 JWT 安全
JWT Payload 结构
{
"iss": "user-management-system",
"sub": "123456",
"aud": "api.example.com",
"exp": 1699999999,
"iat": 1699992799,
"jti": "unique-token-id",
"user": {
"id": 123456,
"username": "john_doe",
"roles": ["user"]
}
}
JWT 安全最佳实践
- 不存储敏感信息:不在 Payload 中存储密码、手机号等
- 设置合理的过期时间:Access Token 2 小时,Refresh Token 30 天
- 使用强签名算法:RS256 或 ES256
- Token 黑名单:吊销的 Token 存入 Redis
- 刷新 Token 一次性:使用后立即失效
Token 黑名单实现
class TokenBlacklist:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def revoke(self, token: str, expire_at: int):
key = f"blacklist:{token}"
self.redis.setex(key, expire_at, "1")
def is_revoked(self, token: str) -> bool:
key = f"blacklist:{token}"
return self.redis.exists(key) == 1
3.2 OAuth 2.0 安全
授权码模式流程
sequenceDiagram
participant User as 用户
participant App as 应用
participant Auth as 认证服务
participant Resource as 资源服务
User->>App: 1. 点击登录
App->>Auth: 2. 重定向到授权页
User->>Auth: 3. 授权
Auth->>App: 4. 返回授权码
App->>Auth: 5. 用授权码换取 Token
Auth->>App: 6. 返回 Token
App->>Resource: 7. 用 Token 访问资源
Resource->>App: 8. 返回资源
安全注意事项
- state 参数:防止 CSRF 攻击
- PKCE:移动端推荐使用
- HTTPS:所有通信必须使用 HTTPS
- Token 存储:后端存储,避免前端暴露
- Scope 限制:最小权限原则
4. 审计与监控
4.1 审计日志
审计事件
| 事件类型 | 说明 | 优先级 |
|---|---|---|
| 用户注册 | 新用户注册 | 中 |
| 用户登录 | 用户登录成功/失败 | 中 |
| 密码修改 | 用户修改密码 | 高 |
| 角色分配 | 分配/移除角色 | 高 |
| 权限变更 | 修改权限 | 高 |
| 数据导出 | 导出敏感数据 | 高 |
| 异常操作 | 异常行为检测 | 高 |
审计日志格式
{
"event_id": "uuid",
"event_type": "password.changed",
"user_id": 123456,
"username": "john_doe",
"ip": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"resource_type": "user",
"resource_id": 123456,
"action": "update",
"old_value": "***",
"new_value": "***",
"result": "success",
"error_message": null,
"created_at": "2026-03-10T10:00:00Z"
}
4.2 异常检测
异常登录检测
- 异地登录检测
def detect_abnormal_login(user_id: str, current_ip: str) -> bool:
# 获取用户最近登录 IP 列表
recent_ips = get_user_recent_ips(user_id, limit=5)
# 获取当前 IP 的地理位置
current_location = get_ip_location(current_ip)
# 检查是否与最近登录位置差异过大
for ip in recent_ips:
location = get_ip_location(ip)
distance = calculate_distance(location, current_location)
if distance > 1000: # 超过 1000 公里
return True
return False
- 异常设备检测
def detect_abnormal_device(user_id: str, device_id: str) -> bool:
# 检查设备是否已注册
if not is_device_registered(user_id, device_id):
# 新设备,需要二次验证
return True
# 检查设备最后活跃时间
last_active = get_device_last_active(device_id)
if last_active.days_ago() > 30:
# 设备长时间未使用,需要验证
return True
return False
4.3 安全监控指标
| 指标 | 阈值 | 告警级别 |
|---|---|---|
| 登录失败率 | > 10% | 警告 |
| 单 IP 登录失败次数 | > 20 次/分钟 | 严重 |
| 单用户登录失败次数 | > 10 次/小时 | 警告 |
| 异常登录次数 | > 5 次/小时 | 严重 |
| Token 验证失败率 | > 5% | 警告 |
| 接口调用异常 | 错误率 > 1% | 警告 |
5. 合规性要求
5.1 GDPR 合规
数据主体权利
| 权利 | 实现方式 |
|---|---|
| 访问权 | 提供数据导出接口 |
| 更正权 | 支持用户更新信息 |
| 删除权 | 支持账号删除(数据清理) |
| 限制处理权 | 支持数据冻结 |
| 数据携带权 | 支持数据导出为标准格式 |
| 反对权 | 支持用户撤销授权 |
数据最小化原则
# 只收集必要的用户信息
def collect_user_data():
return {
"username": username,
"email": email,
"phone": phone,
# 不收集不必要的字段,如家庭住址、收入等
}
5.2 个人信息保护法
数据分类分级
| 级别 | 数据类型 | 保护措施 |
|---|---|---|
| 一级(一般) | 用户名、昵称 | 访问控制 |
| 二级(重要) | 手机号、邮箱 | 加密存储 + 访问控制 |
| 三级(敏感) | 身份证号、银行信息 | 强加密 + 审计日志 |
| 四级(核心) | 生物识别信息 | HSM + 最小权限 |
数据留存策略
data_retention:
# 活跃用户数据:永久保留
active_users: forever
# 注销用户数据:保留 30 天后清理
deleted_users: 30 days
# 日志数据:保留 2 年
logs: 2 years
# 登录日志:保留 1 年
login_logs: 1 year
# 审计日志:保留 5 年
audit_logs: 5 years
5.3 等保 2.0
安全等级:三级
| 控制项 | 要求 | 实现状态 |
|---|---|---|
| 身份鉴别 | 双因素认证 | ✅ 已实现 |
| 访问控制 | RBAC 权限模型 | ✅ 已实现 |
| 安全审计 | 完整的审计日志 | ✅ 已实现 |
| 数据完整性 | 数据加密 + 校验 | ✅ 已实现 |
| 数据保密性 | 敏感数据加密 | ✅ 已实现 |
| 入侵防范 | 异常检测 + 告警 | ✅ 已实现 |
6. 安全开发流程
6.1 安全编码规范
OWASP Top 10 防护
-
A01:2021 – 访问控制失效
- 实施严格的权限检查
- 默认拒绝策略
-
A02:2021 – 加密失效
- 使用强加密算法
- 禁止弱加密
-
A03:2021 – 注入
- 参数化查询
- 输入验证
-
A04:2021 – 不安全设计
- 安全威胁建模
- 安全代码审查
-
A05:2021 – 安全配置错误
- 默认安全配置
- 定期安全扫描
6.2 安全测试
测试类型
| 测试类型 | 频率 | 工具 |
|---|---|---|
| 静态代码分析 | 每次提交 | SonarQube |
| 动态安全测试 | 每周 | OWASP ZAP |
| 依赖漏洞扫描 | 每天 | Snyk |
| 渗透测试 | 每季度 | 人工 + 自动 |
| 代码安全审查 | 每次 PR | 人工 |
6.3 应急响应
安全事件响应流程
graph TD
A[发现安全事件] --> B[确认事件级别]
B --> C{事件级别}
C -->|低| D[记录日志]
C -->|中| E[隔离受影响系统]
C -->|高| F[紧急响应]
E --> G[分析原因]
F --> G
G --> H[修复漏洞]
H --> I[验证修复]
I --> J[恢复服务]
J --> K[事后复盘]
7. 安全检查清单
部署前检查
- 所有接口强制使用 HTTPS
- 密码使用 Argon2id 加密
- 敏感数据使用 AES-256 加密
- JWT 使用 RS256 签名
- 实现接口限流
- 实现登录失败限制
- 实现审计日志
- 配置安全响应头
- 关闭不必要的端口
- 定期更新依赖包
运维检查
- 每日检查异常登录日志
- 每周检查接口调用异常
- 每月进行安全扫描
- 每季度进行渗透测试
- 每年进行安全审计
本文档持续更新中,如有疑问请联系安全团队。