# 安全设计文档 ## 概述 本文档描述用户管理系统的安全设计,包括数据加密、防攻击策略、合规性要求等。安全是系统的核心考量,所有设计均符合企业级安全标准。 ## 安全架构 ### 安全层次 ``` ┌─────────────────────────────────────────┐ │ 应用层安全 (Application) │ │ • 输入验证 • 输出编码 • 业务安全 │ ├─────────────────────────────────────────┤ │ 服务层安全 (Service) │ │ • 认证授权 • 权限控制 • 数据过滤 │ ├─────────────────────────────────────────┤ │ 网络层安全 (Network) │ │ • HTTPS/TLS • 防火墙 • 网络隔离 │ ├─────────────────────────────────────────┤ │ 数据层安全 (Data) │ │ • 数据加密 • 访问控制 • 审计日志 │ └─────────────────────────────────────────┘ ``` --- ## 1. 数据加密 ### 1.1 密码加密 #### 加密算法选择 | 算法 | 推荐度 | 说明 | |------|--------|------| | Argon2id | ⭐⭐⭐⭐⭐ | 最推荐,抗 GPU/ASIC 攻击 | | bcrypt | ⭐⭐⭐⭐ | 成熟稳定,可配置成本因子 | | PBKDF2 | ⭐⭐⭐ | NIST 认证,但性能较差 | #### 推荐配置(Argon2id) ```yaml argon2: algorithm: argon2id memory_cost: 65536 # 64 MB time_cost: 3 # 迭代次数 parallelism: 4 # 并行线程 hash_length: 32 # Hash 长度 salt_length: 16 # 盐长度 ``` #### 加密流程 ```python # Python 伪代码 import argon2 def hash_password(password: str) -> str: # 生成随机盐 salt = os.urandom(16) # 使用 Argon2id 加密 hasher = argon2.PasswordHasher( time_cost=3, memory_cost=65536, parallelism=4, hash_len=32, salt_len=16 ) return hasher.hash(password) def verify_password(password: str, hash: str) -> bool: try: hasher = argon2.PasswordHasher() hasher.verify(hash, password) return True except: return False ``` #### 密码策略 | 规则 | 要求 | 说明 | |------|------|------| | 最小长度 | 8 字符 | 建议使用 12 字符以上 | | 最大长度 | 32 字符 | 防止 DoS 攻击 | | 大小写 | 至少 1 个 | - | | 数字 | 至少 1 个 | - | | 特殊字符 | 至少 1 个 | !@#$%^&*()_+-=[]{}|;:'",.<>/? | | 常见密码 | 检查黑名单 | 禁止使用 123456、password 等 | | 密码历史 | 检查最近 5 次 | 防止重复使用旧密码 | --- ### 1.2 敏感数据加密 #### 加密数据范围 | 数据类型 | 加密方式 | 说明 | |----------|----------|------| | 手机号 | AES-256-GCM | 部分脱敏 + 加密存储 | | 身份证号 | AES-256-GCM | 完全加密 | | 银行卡号 | AES-256-GCM | 部分脱敏 + 加密存储 | | 邮箱 | AES-256-GCM | 可选加密 | | 私钥/密钥 | HSM | 硬件安全模块 | #### AES-256-GCM 配置 ```yaml encryption: algorithm: AES-256-GCM key_size: 256 key_rotation_days: 90 key_store: HSM # 或 KMS ``` #### 加密实现(Go) ```go package security import ( "crypto/aes" "crypto/cipher" "crypto/rand" "encoding/base64" "io" ) type AESEncryptor struct { key []byte } func NewAESEncryptor(key string) *AESEncryptor { return &AESEncryptor{key: []byte(key)} } func (e *AESEncryptor) Encrypt(plaintext string) (string, error) { block, err := aes.NewCipher(e.key) if err != nil { return "", err } gcm, err := cipher.NewGCM(block) if err != nil { return "", err } nonce := make([]byte, gcm.NonceSize()) if _, err = io.ReadFull(rand.Reader, nonce); err != nil { return "", err } ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil) return base64.StdEncoding.EncodeToString(ciphertext), nil } func (e *AESEncryptor) Decrypt(ciphertext string) (string, error) { data, err := base64.StdEncoding.DecodeString(ciphertext) if err != nil { return "", err } block, err := aes.NewCipher(e.key) if err != nil { return "", err } gcm, err := cipher.NewGCM(block) if err != nil { return "", err } nonceSize := gcm.NonceSize() nonce, ciphertext := data[:nonceSize], data[nonceSize:] plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) if err != nil { return "", err } return string(plaintext), nil } ``` --- ### 1.3 Token 签名 #### JWT 签名算法 | 算法 | 安全性 | 说明 | |------|--------|------| | RS256 | ⭐⭐⭐⭐⭐ | 推荐,非对称加密 | | ES256 | ⭐⭐⭐⭐⭐ | 推荐,ECDSA 签名 | | HS256 | ⭐⭐⭐ | 对称加密,密钥管理复杂 | #### 推荐配置(RS256) ```yaml jwt: algorithm: RS256 access_token_expire: 7200 # 2 小时 refresh_token_expire: 2592000 # 30 天 key_rotation_days: 90 issuer: "user-management-system" ``` #### 密钥管理 - **私钥存储**:使用 HSM 或 KMS - **公钥分发**:通过 JWKS 端点 - **密钥轮换**:每 90 天轮换一次 - **密钥备份**:加密备份,多地存储 --- ### 1.4 数据脱敏 #### 脱敏规则 | 数据类型 | 显示格式 | 示例 | |----------|----------|------| | 手机号 | 138****8000 | 13800138000 → 138****8000 | | 邮箱 | j***e@example.com | john@example.com → j***e@example.com | | 身份证号 | 110***********1234 | 110101199001011234 → 110***********1234 | | 银行卡号 | 6222********1234 | 6222020012345678 → 6222********1234 | #### 脱敏实现(Java) ```java public class DataMasking { public static String maskPhone(String phone) { if (phone == null || phone.length() < 11) { return phone; } return phone.substring(0, 3) + "****" + phone.substring(7); } public static String maskEmail(String email) { if (email == null || !email.contains("@")) { return email; } String[] parts = email.split("@"); String local = parts[0]; String maskedLocal = local.substring(0, 1) + "***" + local.substring(local.length() - 1); return maskedLocal + "@" + parts[1]; } public static String maskIdCard(String idCard) { if (idCard == null || idCard.length() < 18) { return idCard; } return idCard.substring(0, 3) + "***********" + idCard.substring(14); } } ``` --- ## 2. 防攻击策略 ### 2.1 SQL 注入防护 #### 防护措施 1. **使用参数化查询** ```sql -- 错误示例(SQL 注入风险) SELECT * FROM users WHERE username = '" + username + "' -- 正确示例(参数化查询) SELECT * FROM users WHERE username = ? ``` 2. **使用 ORM 框架** ```python # SQLAlchemy(Python) user = session.query(User).filter(User.username == username).first() # Hibernate(Java) User user = session.createQuery( "FROM User WHERE username = :username", User.class) .setParameter("username", username) .uniqueResult(); ``` 3. **输入验证** ```python import re def validate_username(username: str) -> bool: # 只允许字母、数字、下划线 pattern = r'^[a-zA-Z0-9_]{4,20}$' return re.match(pattern, username) is not None ``` --- ### 2.2 XSS 防护 #### 防护措施 1. **输出编码** ```html
{{ user_input }}
{{ user_input | escape }}
``` 2. **Content Security Policy (CSP)** ```http Content-Security-Policy: default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; ``` 3. **输入过滤** ```python import html def sanitize_input(input_str: str) -> str: # 转义 HTML 特殊字符 return html.escape(input_str) ``` --- ### 2.3 CSRF 防护 #### 防护措施 1. **CSRF Token** ```python # 生成 Token def generate_csrf_token(): return secrets.token_urlsafe(32) # 验证 Token def verify_csrf_token(request): token = request.headers.get('X-CSRF-Token') session_token = session.get('csrf_token') return token == session_token ``` 2. **SameSite Cookie** ```http Set-Cookie: session_id=xxx; SameSite=Strict; Secure; HttpOnly ``` 3. **Origin 验证** ```python def validate_origin(request): allowed_origins = ['https://example.com'] origin = request.headers.get('Origin') return origin in allowed_origins ``` --- ### 2.4 接口防刷 #### 限流策略 | 类型 | 算法 | 阈值 | 时间窗口 | 说明 | |------|------|------|----------|------| | 登录接口 | 令牌桶 | 5 次/分钟 | 1 分钟 | 防止暴力破解 | | 注册接口 | 漏桶 | 3 次/小时 | 1 小时 | 防止批量注册 | | 验证码接口 | 固定窗口 | 1 次/分钟 | 1 分钟 | 防止验证码滥用 | | API 接口(普通) | 滑动窗口 | 1000 次/分钟 | 1 分钟 | 普通用户限流 | | API 接口(VIP) | 令牌桶 | 10000 次/分钟 | 1 分钟 | VIP 用户限流 | | API 接口(IP) | 令牌桶 | 10000 次/分钟 | 1 分钟 | 单 IP 限流 | #### 分布式限流实现 ```go package ratelimit import ( "context" "fmt" "time" "github.com/redis/go-redis/v9" ) // 令牌桶算法 type TokenBucket struct { redis *redis.Client capacity int64 // 桶容量 rate int64 // 令牌生成速率(tokens/秒) } func NewTokenBucket(redis *redis.Client, capacity, rate int64) *TokenBucket { return &TokenBucket{ redis: redis, capacity: capacity, rate: rate, } } // 尝试获取令牌 func (tb *TokenBucket) Allow(ctx context.Context, key string) (bool, error) { now := time.Now().Unix() windowStart := now - 1 // 1 秒时间窗口 pipe := tb.redis.Pipeline() // 获取当前令牌数 tokensKey := fmt.Sprintf("rate_limit:tokens:%s", key) tokensCmd := pipe.Get(ctx, tokensKey) // 获取上次刷新时间 lastRefillKey := fmt.Sprintf("rate_limit:last_refill:%s", key) lastRefillCmd := pipe.Get(ctx, lastRefillKey) _, err := pipe.Exec(ctx) if err != nil && err != redis.Nil { return false, err } var tokens float64 if err := tokensCmd.Err(); err == nil { tokens, _ = tokensCmd.Float64() } else { tokens = float64(tb.capacity) } var lastRefill int64 if err := lastRefillCmd.Err(); err == nil { lastRefill, _ = lastRefillCmd.Int64() } else { lastRefill = now } // 计算需要补充的令牌 elapsedTime := now - lastRefill refillTokens := float64(elapsedTime) * float64(tb.rate) tokens += refillTokens if tokens > float64(tb.capacity) { tokens = float64(tb.capacity) } // 尝试消费一个令牌 if tokens >= 1 { tokens -= 1 // 更新 Redis pipe := tb.redis.Pipeline() pipe.Set(ctx, tokensKey, tokens, 2*time.Second) pipe.Set(ctx, lastRefillKey, now, 2*time.Second) pipe.Exec(ctx) return true, nil } return false, nil } ``` #### Redis 限流实现 ```python import redis import time class RateLimiter: def __init__(self, redis_client: redis.Redis): self.redis = redis_client def is_allowed(self, key: str, limit: int, window: int) -> bool: current = int(time.time()) window_start = current - window pipe = self.redis.pipeline() # 移除过期记录 pipe.zremrangebyscore(key, 0, window_start) # 统计当前窗口请求数 pipe.zcard(key) # 添加当前请求 pipe.zadd(key, {str(current): current}) # 设置过期时间 pipe.expire(key, window) results = pipe.execute() count = results[1] return count < limit ``` --- ### 2.5 密码暴力破解防护 #### 防护措施 1. **登录失败限制** ```yaml security: login: max_attempts: 5 lockout_duration: 1800 # 30 分钟 progressive_delay: true ``` 2. **渐进式延迟** ```python def calculate_lockout_time(attempts: int) -> int: if attempts <= 3: return 0 elif attempts == 4: return 30 # 30 秒 elif attempts == 5: return 300 # 5 分钟 else: return 1800 # 30 分钟 ``` 3. **验证码触发** ```python def should_show_captcha(attempts: int) -> bool: return attempts >= 3 ``` --- ### 2.6 中间人攻击防护 #### HTTPS 配置 ```nginx server { listen 443 ssl http2; server_name api.example.com; ssl_certificate /path/to/cert.pem; ssl_certificate_key /path/to/key.pem; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; ssl_prefer_server_ciphers on; # HSTS add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always; # 其他安全头 add_header X-Frame-Options DENY; add_header X-Content-Type-Options nosniff; add_header X-XSS-Protection "1; mode=block"; } ``` --- ## 3. 认证与授权安全 ### 3.1 JWT 安全 #### JWT Payload 结构 ```json { "iss": "user-management-system", "sub": "123456", "aud": "api.example.com", "exp": 1699999999, "iat": 1699992799, "jti": "unique-token-id", "user": { "id": 123456, "username": "john_doe", "roles": ["user"] } } ``` #### JWT 安全最佳实践 1. **不存储敏感信息**:不在 Payload 中存储密码、手机号等 2. **设置合理的过期时间**:Access Token 2 小时,Refresh Token 30 天 3. **使用强签名算法**:RS256 或 ES256 4. **Token 黑名单**:吊销的 Token 存入 Redis 5. **刷新 Token 一次性**:使用后立即失效 #### Token 黑名单实现 ```python class TokenBlacklist: def __init__(self, redis_client: redis.Redis): self.redis = redis_client def revoke(self, token: str, expire_at: int): key = f"blacklist:{token}" self.redis.setex(key, expire_at, "1") def is_revoked(self, token: str) -> bool: key = f"blacklist:{token}" return self.redis.exists(key) == 1 ``` --- ### 3.2 OAuth 2.0 安全 #### 授权码模式流程 ```mermaid sequenceDiagram participant User as 用户 participant App as 应用 participant Auth as 认证服务 participant Resource as 资源服务 User->>App: 1. 点击登录 App->>Auth: 2. 重定向到授权页 User->>Auth: 3. 授权 Auth->>App: 4. 返回授权码 App->>Auth: 5. 用授权码换取 Token Auth->>App: 6. 返回 Token App->>Resource: 7. 用 Token 访问资源 Resource->>App: 8. 返回资源 ``` #### 安全注意事项 1. **state 参数**:防止 CSRF 攻击 2. **PKCE**:移动端推荐使用 3. **HTTPS**:所有通信必须使用 HTTPS 4. **Token 存储**:后端存储,避免前端暴露 5. **Scope 限制**:最小权限原则 --- ## 4. 审计与监控 ### 4.1 审计日志 #### 审计事件 | 事件类型 | 说明 | 优先级 | |----------|------|--------| | 用户注册 | 新用户注册 | 中 | | 用户登录 | 用户登录成功/失败 | 中 | | 密码修改 | 用户修改密码 | 高 | | 角色分配 | 分配/移除角色 | 高 | | 权限变更 | 修改权限 | 高 | | 数据导出 | 导出敏感数据 | 高 | | 异常操作 | 异常行为检测 | 高 | #### 审计日志格式 ```json { "event_id": "uuid", "event_type": "password.changed", "user_id": 123456, "username": "john_doe", "ip": "192.168.1.1", "user_agent": "Mozilla/5.0...", "resource_type": "user", "resource_id": 123456, "action": "update", "old_value": "***", "new_value": "***", "result": "success", "error_message": null, "created_at": "2026-03-10T10:00:00Z" } ``` --- ### 4.2 异常检测 #### 异常登录检测 1. **异地登录检测** ```python def detect_abnormal_login(user_id: str, current_ip: str) -> bool: # 获取用户最近登录 IP 列表 recent_ips = get_user_recent_ips(user_id, limit=5) # 获取当前 IP 的地理位置 current_location = get_ip_location(current_ip) # 检查是否与最近登录位置差异过大 for ip in recent_ips: location = get_ip_location(ip) distance = calculate_distance(location, current_location) if distance > 1000: # 超过 1000 公里 return True return False ``` 2. **异常设备检测** ```python def detect_abnormal_device(user_id: str, device_id: str) -> bool: # 检查设备是否已注册 if not is_device_registered(user_id, device_id): # 新设备,需要二次验证 return True # 检查设备最后活跃时间 last_active = get_device_last_active(device_id) if last_active.days_ago() > 30: # 设备长时间未使用,需要验证 return True return False ``` --- ### 4.3 安全监控指标 | 指标 | 阈值 | 告警级别 | |------|------|----------| | 登录失败率 | > 10% | 警告 | | 单 IP 登录失败次数 | > 20 次/分钟 | 严重 | | 单用户登录失败次数 | > 10 次/小时 | 警告 | | 异常登录次数 | > 5 次/小时 | 严重 | | Token 验证失败率 | > 5% | 警告 | | 接口调用异常 | 错误率 > 1% | 警告 | --- ## 5. 合规性要求 ### 5.1 GDPR 合规 #### 数据主体权利 | 权利 | 实现方式 | |------|----------| | 访问权 | 提供数据导出接口 | | 更正权 | 支持用户更新信息 | | 删除权 | 支持账号删除(数据清理) | | 限制处理权 | 支持数据冻结 | | 数据携带权 | 支持数据导出为标准格式 | | 反对权 | 支持用户撤销授权 | #### 数据最小化原则 ```python # 只收集必要的用户信息 def collect_user_data(): return { "username": username, "email": email, "phone": phone, # 不收集不必要的字段,如家庭住址、收入等 } ``` --- ### 5.2 个人信息保护法 #### 数据分类分级 | 级别 | 数据类型 | 保护措施 | |------|----------|----------| | 一级(一般) | 用户名、昵称 | 访问控制 | | 二级(重要) | 手机号、邮箱 | 加密存储 + 访问控制 | | 三级(敏感) | 身份证号、银行信息 | 强加密 + 审计日志 | | 四级(核心) | 生物识别信息 | HSM + 最小权限 | #### 数据留存策略 ```yaml data_retention: # 活跃用户数据:永久保留 active_users: forever # 注销用户数据:保留 30 天后清理 deleted_users: 30 days # 日志数据:保留 2 年 logs: 2 years # 登录日志:保留 1 年 login_logs: 1 year # 审计日志:保留 5 年 audit_logs: 5 years ``` --- ### 5.3 等保 2.0 #### 安全等级:三级 | 控制项 | 要求 | 实现状态 | |--------|------|----------| | 身份鉴别 | 双因素认证 | ✅ 已实现 | | 访问控制 | RBAC 权限模型 | ✅ 已实现 | | 安全审计 | 完整的审计日志 | ✅ 已实现 | | 数据完整性 | 数据加密 + 校验 | ✅ 已实现 | | 数据保密性 | 敏感数据加密 | ✅ 已实现 | | 入侵防范 | 异常检测 + 告警 | ✅ 已实现 | --- ## 6. 安全开发流程 ### 6.1 安全编码规范 #### OWASP Top 10 防护 1. **A01:2021 – 访问控制失效** - 实施严格的权限检查 - 默认拒绝策略 2. **A02:2021 – 加密失效** - 使用强加密算法 - 禁止弱加密 3. **A03:2021 – 注入** - 参数化查询 - 输入验证 4. **A04:2021 – 不安全设计** - 安全威胁建模 - 安全代码审查 5. **A05:2021 – 安全配置错误** - 默认安全配置 - 定期安全扫描 --- ### 6.2 安全测试 #### 测试类型 | 测试类型 | 频率 | 工具 | |----------|------|------| | 静态代码分析 | 每次提交 | SonarQube | | 动态安全测试 | 每周 | OWASP ZAP | | 依赖漏洞扫描 | 每天 | Snyk | | 渗透测试 | 每季度 | 人工 + 自动 | | 代码安全审查 | 每次 PR | 人工 | --- ### 6.3 应急响应 #### 安全事件响应流程 ```mermaid graph TD A[发现安全事件] --> B[确认事件级别] B --> C{事件级别} C -->|低| D[记录日志] C -->|中| E[隔离受影响系统] C -->|高| F[紧急响应] E --> G[分析原因] F --> G G --> H[修复漏洞] H --> I[验证修复] I --> J[恢复服务] J --> K[事后复盘] ``` --- ## 7. 安全检查清单 ### 部署前检查 - [ ] 所有接口强制使用 HTTPS - [ ] 密码使用 Argon2id 加密 - [ ] 敏感数据使用 AES-256 加密 - [ ] JWT 使用 RS256 签名 - [ ] 实现接口限流 - [ ] 实现登录失败限制 - [ ] 实现审计日志 - [ ] 配置安全响应头 - [ ] 关闭不必要的端口 - [ ] 定期更新依赖包 ### 运维检查 - [ ] 每日检查异常登录日志 - [ ] 每周检查接口调用异常 - [ ] 每月进行安全扫描 - [ ] 每季度进行渗透测试 - [ ] 每年进行安全审计 --- *本文档持续更新中,如有疑问请联系安全团队。*