Files
sub2api-cn-relay-manager/scripts/acceptance/migrate_historical_artifacts.py
2026-05-27 09:39:05 +08:00

211 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""Normalize historical real-host artifacts into repo-safe form."""
import json
import pathlib
import shutil
import sys
from typing import Iterable
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
from artifact_redaction import sanitize_group_state, sanitize_headers, sanitize_runtime_context, sanitize_nested, redact_key # noqa: E402
SENSITIVE_FILE_NAMES = {
"00-managed-key.txt",
"00-raw-user-key.txt",
"05-subscription-access-prep.sql",
}
SENSITIVE_TEXT_PATTERNS = (
"managed-key",
"raw-user-key",
"probe-key",
"key-preview",
"key-corrected",
)
ROOT_SENSITIVE_JSON_NAMES = {
"deepseek.json",
"minimax.json",
"summary.json",
"99-summary.json",
"99-semantic-summary.json",
}
def write_json(path: pathlib.Path, payload) -> None:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def migrate_key_source(path: pathlib.Path) -> None:
payload = json.loads(path.read_text(encoding="utf-8"))
if "redacted" in payload:
return
source = payload.get("source")
provider_id = payload.get("provider_id")
raw = ""
prefix = str(payload.get("upstream_key_prefix") or "")
suffix = str(payload.get("upstream_key_suffix") or "")
if prefix or suffix:
raw = prefix + suffix
write_json(path, {
"source": source,
"provider_id": provider_id,
"redacted": redact_key(raw),
})
def migrate_runtime_context(path: pathlib.Path) -> None:
payload = json.loads(path.read_text(encoding="utf-8"))
write_json(path, sanitize_runtime_context(payload))
def migrate_redis_invalidation(path: pathlib.Path) -> None:
raw = path.read_text(encoding="utf-8")
write_json(path.with_suffix('.json'), {
"auth_cache_invalidated": "auth_cache_key=" in raw,
"balance_cache_invalidated": "balance_cache_key=" in raw,
"subscription_cache_invalidated": "subscription_cache_key=" in raw,
"redis_del_exit_code": 0 if raw.strip().endswith("3") or raw.strip().endswith("0") else None,
})
path.unlink()
def migrate_group_state(path: pathlib.Path) -> None:
payload = json.loads(path.read_text(encoding="utf-8"))
write_json(path, sanitize_group_state(payload))
def migrate_sql_summary(path: pathlib.Path) -> None:
raw = path.read_text(encoding="utf-8")
group_id = None
min_balance = None
subscription_days = None
key_value = ""
for line in raw.splitlines():
if "group_id = " in line and group_id is None:
try:
group_id = int(line.split("group_id = ", 1)[1].split()[0].strip().strip(",;"))
except Exception:
group_id = None
if "balance < " in line and min_balance is None:
try:
min_balance = int(line.split("balance < ", 1)[1].split()[0].strip().strip(",;"))
except Exception:
min_balance = None
if "interval '" in line and subscription_days is None:
try:
subscription_days = int(line.split("interval '", 1)[1].split(" days'", 1)[0])
except Exception:
subscription_days = None
if "WHERE key = '" in line and not key_value:
key_value = line.split("WHERE key = '", 1)[1].split("'", 1)[0]
summary = {
"subscription_group_id": group_id,
"min_balance": min_balance,
"subscription_days": subscription_days,
"api_key": redact_key(key_value),
}
write_json(path.with_name("05-subscription-access-prep.summary.json"), summary)
def maybe_update_guide(path: pathlib.Path) -> None:
raw = path.read_text(encoding="utf-8")
if "artifact security mode:" in raw:
return
updated = raw.replace(
"真实宿主验收产物 -> 速查清单对应\n\n",
"真实宿主验收产物 -> 速查清单对应\n\nartifact security mode: migrated-safe\ncontains raw secrets: no\nrepository-safe: yes\n\n",
1,
)
path.write_text(updated, encoding="utf-8")
def sanitize_header_file(path: pathlib.Path) -> None:
path.write_text(sanitize_headers(path.read_text(encoding="utf-8")), encoding="utf-8")
def sanitize_json_file(path: pathlib.Path) -> None:
payload = json.loads(path.read_text(encoding="utf-8"))
write_json(path, sanitize_nested(payload))
def mirror_sensitive(root: pathlib.Path, sensitive_root: pathlib.Path, path: pathlib.Path) -> None:
rel = path.relative_to(root)
dst = sensitive_root / rel
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(path), str(dst))
def walk_artifact_dirs(root: pathlib.Path) -> Iterable[pathlib.Path]:
for child in sorted(root.iterdir()):
if child.is_dir():
yield child
def should_sanitize_json(path: pathlib.Path) -> bool:
if path.suffix != ".json":
return False
if path.name in {"00-local-key-source.json", "01-runtime-context.json", "00-context.json", "08-subscription-group-state.json"}:
return False
if path.name in ROOT_SENSITIVE_JSON_NAMES:
return True
if path.name in {"05a-batch-detail-pre-access.json", "07-access-status.json", "10-batch-detail.json"}:
return True
return False
def should_mirror_sensitive_text(path: pathlib.Path) -> bool:
if path.suffix != ".txt":
return False
lower = path.name.lower()
return any(token in lower for token in SENSITIVE_TEXT_PATTERNS)
def main() -> None:
if len(sys.argv) != 2:
raise SystemExit("usage: migrate_historical_artifacts.py <artifacts-root>")
root = pathlib.Path(sys.argv[1]).resolve()
sensitive_root = root.parent / "real-host-acceptance-sensitive"
for artifact_dir in walk_artifact_dirs(root):
for path in sorted(artifact_dir.rglob("*")):
if not path.is_file():
continue
if path.name in SENSITIVE_FILE_NAMES:
if path.name == "05-subscription-access-prep.sql":
migrate_sql_summary(path)
mirror_sensitive(root, sensitive_root, path)
continue
if should_mirror_sensitive_text(path):
mirror_sensitive(root, sensitive_root, path)
continue
if path.name == "00-local-key-source.json":
migrate_key_source(path)
continue
if path.name in {"01-runtime-context.json", "00-context.json"}:
migrate_runtime_context(path)
continue
if path.name == "07-redis-targeted-invalidation.txt":
migrate_redis_invalidation(path)
continue
if path.name == "08-subscription-group-state.json":
migrate_group_state(path)
continue
if path.suffix == ".txt" and "headers" in path.name:
sanitize_header_file(path)
continue
if path.name == "00-artifact-guide.txt":
maybe_update_guide(path)
continue
if should_sanitize_json(path):
sanitize_json_file(path)
continue
print(json.dumps({
"root": str(root),
"sensitive_root": str(sensitive_root),
"status": "ok",
}, ensure_ascii=False))
if __name__ == "__main__":
main()