Files
user-system/validate_code.py

643 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
用户管理系统 - 代码结构验证脚本
验证所有模块、接口、服务的完整性
"""
import os
import sys
from pathlib import Path
from typing import List, Dict, Tuple
import json
# 颜色输出
class Colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# 项目根目录
PROJECT_ROOT = Path(__file__).parent.absolute()
# 预期的文件结构
EXPECTED_STRUCTURE = {
"cmd/server/main.go": "主程序入口",
"go.mod": "Go模块依赖",
"Makefile": "构建脚本",
# Domain层
"internal/domain/user.go": "用户实体",
"internal/domain/role.go": "角色实体",
"internal/domain/permission.go": "权限实体",
"internal/domain/device.go": "设备实体",
"internal/domain/user_role.go": "用户角色关联",
"internal/domain/role_permission.go": "角色权限关联",
"internal/domain/login_log.go": "登录日志",
"internal/domain/operation_log.go": "操作日志",
# Repository层
"internal/repository/user.go": "用户数据访问",
"internal/repository/role.go": "角色数据访问",
"internal/repository/permission.go": "权限数据访问",
"internal/repository/device.go": "设备数据访问",
"internal/repository/user_role.go": "用户角色关联",
"internal/repository/role_permission.go": "角色权限关联",
# Service层
"internal/service/auth.go": "认证服务",
"internal/service/user.go": "用户服务",
"internal/service/role.go": "角色服务",
"internal/service/permission.go": "权限服务",
"internal/service/device.go": "设备服务",
# Handler层
"internal/api/handler/auth.go": "认证处理器",
"internal/api/handler/user.go": "用户处理器",
"internal/api/handler/role.go": "角色处理器",
"internal/api/handler/permission.go": "权限处理器",
"internal/api/handler/device.go": "设备处理器",
# Middleware层
"internal/api/middleware/auth.go": "认证中间件",
"internal/api/middleware/ratelimit.go": "限流中间件",
"internal/api/middleware/cors.go": "CORS中间件",
"internal/api/middleware/error.go": "错误处理中间件",
"internal/api/middleware/logger.go": "日志中间件",
# 路由
"internal/api/router/router.go": "路由配置",
# 缓存
"internal/cache/cache_manager.go": "缓存管理器",
"internal/cache/l1.go": "L1本地缓存",
"internal/cache/l2.go": "L2 Redis缓存",
# 监控
"internal/monitoring/metrics.go": "Prometheus指标",
"internal/monitoring/health.go": "健康检查",
"internal/monitoring/middleware.go": "监控中间件",
# 认证
"internal/auth/jwt.go": "JWT管理",
"internal/auth/oauth.go": "OAuth",
"internal/auth/password.go": "密码加密",
# 安全
"internal/security/ratelimit.go": "限流",
"internal/security/validator.go": "参数验证",
"internal/security/encryption.go": "加密工具",
# 数据库
"internal/database/db.go": "数据库初始化",
# 配置
"internal/config/config.go": "配置管理",
# 响应
"internal/response/response.go": "响应封装",
# 错误
"pkg/errors/errors.go": "错误定义",
"pkg/response/response.go": "响应定义",
}
# 预期的API接口
EXPECTED_API_ENDPOINTS = {
"认证接口": [
"POST /api/v1/auth/register",
"POST /api/v1/auth/login",
"POST /api/v1/auth/refresh",
"POST /api/v1/auth/logout",
"POST /api/v1/auth/login/code",
],
"用户接口": [
"GET /api/v1/users",
"GET /api/v1/users/me",
"GET /api/v1/users/:id",
"POST /api/v1/users",
"PUT /api/v1/users/:id",
"DELETE /api/v1/users/:id",
"PUT /api/v1/users/:id/password",
"PUT /api/v1/users/:id/status",
"GET /api/v1/users/:id/roles",
"PUT /api/v1/users/:id/roles",
],
"角色接口": [
"GET /api/v1/roles",
"GET /api/v1/roles/:id",
"POST /api/v1/roles",
"PUT /api/v1/roles/:id",
"DELETE /api/v1/roles/:id",
"PUT /api/v1/roles/:id/status",
"GET /api/v1/roles/:id/permissions",
"PUT /api/v1/roles/:id/permissions",
],
"权限接口": [
"GET /api/v1/permissions",
"GET /api/v1/permissions/tree",
"GET /api/v1/permissions/:id",
"POST /api/v1/permissions",
"PUT /api/v1/permissions/:id",
"DELETE /api/v1/permissions/:id",
"PUT /api/v1/permissions/:id/status",
],
"设备接口": [
"GET /api/v1/devices",
"GET /api/v1/devices/:id",
"POST /api/v1/devices",
"PUT /api/v1/devices/:id",
"DELETE /api/v1/devices/:id",
"PUT /api/v1/devices/:id/status",
"GET /api/v1/devices/users/:id",
],
}
def print_success(msg: str):
print(f"{Colors.OKGREEN}{Colors.ENDC} {msg}")
def print_error(msg: str):
print(f"{Colors.FAIL}{Colors.ENDC} {msg}")
def print_warning(msg: str):
print(f"{Colors.WARNING}{Colors.ENDC} {msg}")
def print_info(msg: str):
print(f"{Colors.OKBLUE}{Colors.ENDC} {msg}")
def verify_file_structure() -> Tuple[int, int, List[str]]:
"""验证文件结构"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}1. 文件结构验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
total = len(EXPECTED_STRUCTURE)
exists = 0
missing_files = []
for file_path, description in EXPECTED_STRUCTURE.items():
full_path = PROJECT_ROOT / file_path
if full_path.exists():
exists += 1
size = full_path.stat().st_size
print_success(f"{file_path:<50} ({size} bytes) - {description}")
else:
missing_files.append(file_path)
print_error(f"{file_path:<50} - {description}")
print(f"\n{Colors.OKCYAN}文件结构完成度: {exists}/{total} ({exists/total*100:.1f}%){Colors.ENDC}")
return total, exists, missing_files
def verify_service_methods() -> Dict[str, List[str]]:
"""验证Service层方法"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}2. Service层方法验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
# Service文件及其预期方法
service_methods = {
"auth.go": [
"Register",
"Login",
"LoginByCode",
"SocialLogin",
"RefreshToken",
"Logout",
"GetUserInfo",
"SendVerificationCode",
],
"user.go": [
"GetUser",
"GetUserByID",
"ListUsers",
"CreateUser",
"UpdateUser",
"DeleteUser",
"UpdateUserStatus",
"UpdatePassword",
"GetUserRoles",
"AssignRoles",
],
"role.go": [
"GetRole",
"ListRoles",
"CreateRole",
"UpdateRole",
"DeleteRole",
"UpdateRoleStatus",
"GetRolePermissions",
"AssignPermissions",
],
"permission.go": [
"GetPermission",
"ListPermissions",
"GetPermissionTree",
"CreatePermission",
"UpdatePermission",
"DeletePermission",
"UpdatePermissionStatus",
"Search",
],
"device.go": [
"GetDevice",
"ListDevices",
"CreateDevice",
"UpdateDevice",
"DeleteDevice",
"UpdateDeviceStatus",
"GetUserDevices",
],
}
results = {}
for service_file, expected_methods in service_methods.items():
print(f"\n{Colors.OKCYAN}📁 internal/service/{service_file}{Colors.ENDC}")
file_path = PROJECT_ROOT / "internal/service" / service_file
if not file_path.exists():
print_error(f"文件不存在")
results[service_file] = {"exists": False, "methods": []}
continue
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
found_methods = []
missing_methods = []
for method in expected_methods:
if f"func.*{method}" in content:
found_methods.append(method)
print_success(f"{method}")
else:
missing_methods.append(method)
print_error(f"{method}")
print(f" {Colors.OKCYAN}完成度: {len(found_methods)}/{len(expected_methods)}{Colors.ENDC}")
results[service_file] = {
"exists": True,
"found": found_methods,
"missing": missing_methods,
"total": len(expected_methods),
}
return results
def verify_handler_methods() -> Dict[str, List[str]]:
"""验证Handler层方法"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}3. Handler层方法验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
handler_methods = {
"auth.go": [
"Register",
"Login",
"LoginByCode",
"SocialLogin",
"RefreshToken",
"Logout",
"SendVerificationCode",
],
"user.go": [
"GetUser",
"GetCurrentUser",
"ListUsers",
"CreateUser",
"UpdateUser",
"DeleteUser",
"UpdateUserStatus",
"UpdatePassword",
"GetUserRoles",
"AssignRoles",
],
"role.go": [
"CreateRole",
"ListRoles",
"GetRole",
"UpdateRole",
"DeleteRole",
"UpdateRoleStatus",
"GetRolePermissions",
"AssignPermissions",
],
"permission.go": [
"CreatePermission",
"ListPermissions",
"GetPermissionTree",
"GetPermission",
"UpdatePermission",
"DeletePermission",
"UpdatePermissionStatus",
],
"device.go": [
"GetMyDevices",
"CreateDevice",
"GetDevice",
"UpdateDevice",
"DeleteDevice",
"UpdateDeviceStatus",
"GetUserDevices",
],
}
results = {}
for handler_file, expected_methods in handler_methods.items():
print(f"\n{Colors.OKCYAN}📁 internal/api/handler/{handler_file}{Colors.ENDC}")
file_path = PROJECT_ROOT / "internal/api/handler" / handler_file
if not file_path.exists():
print_error(f"文件不存在")
results[handler_file] = {"exists": False, "methods": []}
continue
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
found_methods = []
missing_methods = []
for method in expected_methods:
if f"func.*{method}" in content:
found_methods.append(method)
print_success(f"{method}")
else:
missing_methods.append(method)
print_error(f"{method}")
print(f" {Colors.OKCYAN}完成度: {len(found_methods)}/{len(expected_methods)}{Colors.ENDC}")
results[handler_file] = {
"exists": True,
"found": found_methods,
"missing": missing_methods,
"total": len(expected_methods),
}
return results
def verify_middleware() -> None:
"""验证中间件"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}4. 中间件验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
middlewares = [
("auth.go", "认证中间件"),
("ratelimit.go", "限流中间件"),
("cors.go", "CORS中间件"),
("logger.go", "日志中间件"),
("error.go", "错误处理中间件"),
]
middleware_dir = PROJECT_ROOT / "internal/api/middleware"
for middleware_file, description in middlewares:
file_path = middleware_dir / middleware_file
if file_path.exists():
print_success(f"{middleware_file:<20} - {description}")
else:
print_error(f"{middleware_file:<20} - {description}")
def verify_cache() -> None:
"""验证缓存"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}5. 缓存模块验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
cache_files = [
("cache_manager.go", "缓存管理器 - L1+L2多级缓存"),
("l1.go", "L1本地缓存实现"),
("l2.go", "L2 Redis缓存实现"),
]
cache_dir = PROJECT_ROOT / "internal/cache"
for cache_file, description in cache_files:
file_path = cache_dir / cache_file
if file_path.exists():
print_success(f"{cache_file:<20} - {description}")
else:
print_error(f"{cache_file:<20} - {description}")
def verify_monitoring() -> None:
"""验证监控"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}6. 监控模块验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
monitoring_files = [
("metrics.go", "Prometheus指标采集"),
("health.go", "健康检查"),
("middleware.go", "监控中间件"),
]
monitoring_dir = PROJECT_ROOT / "internal/monitoring"
for monitoring_file, description in monitoring_files:
file_path = monitoring_dir / monitoring_file
if file_path.exists():
print_success(f"{monitoring_file:<20} - {description}")
else:
print_error(f"{monitoring_file:<20} - {description}")
def verify_database() -> None:
"""验证数据库"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}7. 数据库验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
# 检查迁移文件
migration_files = [
("migrations/sqlite/V1__init.sql", "SQLite数据库迁移文件"),
]
for migration_file, description in migration_files:
file_path = PROJECT_ROOT / migration_file
if file_path.exists():
print_success(f"{migration_file:<50} - {description}")
# 检查表定义
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
expected_tables = ["users", "roles", "permissions", "user_roles", "role_permissions", "devices", "login_logs", "operation_logs"]
print(f"\n {Colors.OKCYAN}数据表检查:{Colors.ENDC}")
for table in expected_tables:
if table in content:
print_success(f"{table}")
else:
print_error(f"{table}")
else:
print_error(f"{migration_file:<50} - {description}")
def verify_config() -> None:
"""验证配置"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}8. 配置文件验证{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
config_files = [
("configs/config.yaml", "应用配置文件"),
("deployment/docker-compose.yml", "Docker Compose配置"),
("deployment/alertmanager/alertmanager.yml", "AlertManager配置"),
("deployment/alertmanager/alerts.yml", "告警规则"),
("deployment/grafana/dashboards/user-management.json", "Grafana仪表板"),
]
for config_file, description in config_files:
file_path = PROJECT_ROOT / config_file
if file_path.exists():
print_success(f"{config_file:<50} - {description}")
else:
print_error(f"{config_file:<50} - {description}")
def count_lines_of_code() -> Dict[str, int]:
"""统计代码行数"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}9. 代码统计{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
stats = {
"total": 0,
"go": 0,
"sql": 0,
"yaml": 0,
"json": 0,
"md": 0,
}
for file_path in PROJECT_ROOT.rglob("*"):
if file_path.is_file():
# 跳过特定目录
if any(part in file_path.parts for part in [".git", "node_modules", "vendor", "__pycache__"]):
continue
ext = file_path.suffix.lower()
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = len(f.readlines())
stats["total"] += lines
if ext == ".go":
stats["go"] += lines
elif ext == ".sql":
stats["sql"] += lines
elif ext in [".yaml", ".yml"]:
stats["yaml"] += lines
elif ext == ".json":
stats["json"] += lines
elif ext == ".md":
stats["md"] += lines
except:
pass
print(f"{Colors.OKCYAN}总代码行数: {stats['total']:,}{Colors.ENDC}")
print(f" Go: {stats['go']:,}")
print(f" SQL: {stats['sql']:,}")
print(f" YAML: {stats['yaml']:,}")
print(f" JSON: {stats['json']:,}")
print(f" Markdown:{stats['md']:,}")
return stats
def generate_report() -> Dict:
"""生成验证报告"""
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}10. 生成验证报告{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
# 运行所有验证
total_files, exists_files, missing_files = verify_file_structure()
service_results = verify_service_methods()
handler_results = verify_handler_methods()
verify_middleware()
verify_cache()
verify_monitoring()
verify_database()
verify_config()
code_stats = count_lines_of_code()
# 计算完成度
file_completion = exists_files / total_files * 100
service_total = sum(r["total"] for r in service_results.values() if r.get("exists"))
service_found = sum(len(r.get("found", [])) for r in service_results.values() if r.get("exists"))
service_completion = service_found / service_total * 100 if service_total > 0 else 0
handler_total = sum(r["total"] for r in handler_results.values() if r.get("exists"))
handler_found = sum(len(r.get("found", [])) for r in handler_results.values() if r.get("exists"))
handler_completion = handler_found / handler_total * 100 if handler_total > 0 else 0
overall_completion = (file_completion + service_completion + handler_completion) / 3
# 打印总结
print(f"\n{Colors.HEADER}{'='*60}{Colors.ENDC}")
print(f"{Colors.HEADER}📊 验证总结{Colors.ENDC}")
print(f"{Colors.HEADER}{'='*60}{Colors.ENDC}\n")
print(f"{Colors.OKCYAN}文件结构:{Colors.ENDC} {file_completion:.1f}% ({exists_files}/{total_files})")
print(f"{Colors.OKCYAN}Service层:{Colors.ENDC} {service_completion:.1f}% ({service_found}/{service_total})")
print(f"{Colors.OKCYAN}Handler层:{Colors.ENDC} {handler_completion:.1f}% ({handler_found}/{handler_total})")
print(f"{Colors.OKCYAN}代码总量:{Colors.ENDC} {code_stats['total']:,}")
print(f"\n{Colors.BOLD}{Colors.OKGREEN}总体完成度: {overall_completion:.1f}%{Colors.ENDC}")
if overall_completion >= 95:
print(f"{Colors.OKGREEN}✓ 项目完整度优秀,可以进行验收!{Colors.ENDC}")
elif overall_completion >= 80:
print(f"{Colors.WARNING}⚠ 项目基本完成,但还有部分功能需要补充{Colors.ENDC}")
else:
print(f"{Colors.FAIL}✗ 项目完成度较低,需要继续完善{Colors.ENDC}")
return {
"file_completion": file_completion,
"service_completion": service_completion,
"handler_completion": handler_completion,
"overall_completion": overall_completion,
"total_files": total_files,
"exists_files": exists_files,
"missing_files": missing_files,
"service_results": service_results,
"handler_results": handler_results,
"code_stats": code_stats,
}
if __name__ == "__main__":
print(f"{Colors.BOLD}{Colors.HEADER}")
print("╔════════════════════════════════════════════════════════╗")
print("║ 用户管理系统 - 代码结构验证工具 ║")
print("║ User Management System ║")
print("╚════════════════════════════════════════════════════════╝")
print(f"{Colors.ENDC}")
try:
report = generate_report()
# 保存报告到JSON
report_file = PROJECT_ROOT / "validation_report.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n{Colors.OKCYAN}✓ 验证报告已保存到: {report_file}{Colors.ENDC}")
# 退出码
if report["overall_completion"] >= 95:
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
print(f"\n{Colors.FAIL}✗ 验证过程出错: {e}{Colors.ENDC}")
sys.exit(1)