Files
sub2api-cn-relay-manager/scripts/acceptance/artifact_redaction.py
2026-05-27 09:39:05 +08:00

199 lines
7.0 KiB
Python

#!/usr/bin/env python3
"""Helpers for redacting real-host acceptance artifacts."""
import hashlib
import json
import pathlib
import sys
from typing import Any
KEY_FIELD_NAMES = {
"api_key",
"requested_probe_api_key",
"raw_key",
"subscription_user_key",
"managed_probe_key",
}
PREFIX_FIELD_NAMES = {
"gateway_key_prefix",
"managed_key_prefix",
"managed_probe_key_prefix",
"subscription_user_key_prefix",
"managed_key_preview",
}
IDENTIFIER_FIELD_NAMES = {
"subscription_user_id",
"raw_user_id",
"managed_user_id",
"admin_user_id",
}
EMAIL_FIELD_NAMES = {
"managed_user_email",
}
JSON_STRING_FIELD_NAMES = {
"DetailsJSON",
"details_json",
"probe_summary_json",
}
def redact_key(value: str) -> dict[str, Any]:
value = (value or "").strip()
if not value:
return {
"present": False,
"prefix": "",
"suffix": "",
"fingerprint": "",
}
return {
"present": True,
"prefix": value[:4],
"suffix": value[-4:] if len(value) >= 4 else value,
"fingerprint": hashlib.sha256(value.encode("utf-8")).hexdigest(),
}
def redact_identifier(value: str) -> str:
value = (value or "").strip()
if not value:
return ""
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def sanitize_headers(raw: str) -> str:
lines = []
for line in (raw or "").splitlines():
lower = line.lower()
if lower.startswith("authorization:"):
continue
if lower.startswith("cookie:"):
continue
if lower.startswith("set-cookie:"):
continue
if lower.startswith("x-api-key:"):
continue
lines.append(line)
return "\n".join(lines) + ("\n" if lines else "")
def sanitize_group_state(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {}
group = payload.get("group") if isinstance(payload.get("group"), dict) else {}
subscription = payload.get("subscription") if isinstance(payload.get("subscription"), dict) else {}
key = payload.get("key") if isinstance(payload.get("key"), dict) else {}
key_value = str(key.get("key") or "")
return {
"group_id": payload.get("group_id"),
"group": {
"id": group.get("id"),
"name": group.get("name"),
"type": group.get("type"),
"subscription_type": group.get("subscription_type"),
},
"subscription": {
"id": subscription.get("id"),
"user_id_hash": redact_identifier(str(subscription.get("user_id") or "")),
"group_id": subscription.get("group_id"),
"status": subscription.get("status"),
"starts_at": subscription.get("starts_at"),
"expires_at": subscription.get("expires_at"),
},
"key": {
"id": key.get("id"),
"group_id": key.get("group_id"),
"status": key.get("status"),
"redacted": redact_key(key_value),
},
}
def sanitize_runtime_context(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {}
out: dict[str, Any] = {
"crm_base": payload.get("crm_base"),
"host_base": payload.get("host_base"),
"crm_host_base": payload.get("crm_host_base"),
"remote_host_base": payload.get("remote_host_base"),
"provider_id": payload.get("provider_id"),
"subscription_group_id": payload.get("subscription_group_id"),
"import_group_id": payload.get("import_group_id"),
}
if "subscription_user_id" in payload:
out["subscription_user_id_hash"] = redact_identifier(str(payload.get("subscription_user_id") or ""))
if "managed_user_id" in payload:
out["managed_user_id_hash"] = redact_identifier(str(payload.get("managed_user_id") or ""))
if "admin_user_id" in payload:
out["admin_user_id_hash"] = redact_identifier(str(payload.get("admin_user_id") or ""))
if "managed_user_email" in payload:
out["managed_user_email_hash"] = redact_identifier(str(payload.get("managed_user_email") or ""))
if "subscription_user_key_prefix" in payload or "subscription_user_key" in payload:
source = str(payload.get("subscription_user_key") or payload.get("subscription_user_key_prefix") or "")
out["subscription_user_key"] = redact_key(source)
if "managed_probe_key_prefix" in payload or "managed_probe_key" in payload:
source = str(payload.get("managed_probe_key") or payload.get("managed_probe_key_prefix") or "")
out["managed_probe_key"] = redact_key(source)
return out
def sanitize_nested(value: Any) -> Any:
if isinstance(value, dict):
out: dict[str, Any] = {}
for key, item in value.items():
if key in KEY_FIELD_NAMES:
out[key] = redact_key(str(item or ""))
continue
if key in PREFIX_FIELD_NAMES:
out[key] = redact_key(str(item or ""))
continue
if key in IDENTIFIER_FIELD_NAMES:
out[f"{key}_hash"] = redact_identifier(str(item or ""))
continue
if key in EMAIL_FIELD_NAMES:
out[f"{key}_hash"] = redact_identifier(str(item or ""))
continue
if key in JSON_STRING_FIELD_NAMES and isinstance(item, str):
try:
parsed = json.loads(item)
except Exception:
out[key] = item
else:
out[key] = json.dumps(sanitize_nested(parsed), ensure_ascii=False)
continue
out[key] = sanitize_nested(item)
return out
if isinstance(value, list):
return [sanitize_nested(item) for item in value]
return value
def write_json(path: str, payload: Any) -> None:
pathlib.Path(path).write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
if __name__ == "__main__":
mode = sys.argv[1]
if mode == "redact-key":
print(json.dumps(redact_key(sys.argv[2]), ensure_ascii=False))
elif mode == "redact-id":
print(redact_identifier(sys.argv[2]))
elif mode == "sanitize-headers":
src, dst = sys.argv[2:4]
payload = pathlib.Path(src).read_text(encoding="utf-8")
pathlib.Path(dst).write_text(sanitize_headers(payload), encoding="utf-8")
elif mode == "sanitize-group-state":
src, dst = sys.argv[2:4]
payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8"))
write_json(dst, sanitize_group_state(payload))
elif mode == "sanitize-runtime-context":
src, dst = sys.argv[2:4]
payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8"))
write_json(dst, sanitize_runtime_context(payload))
elif mode == "sanitize-json":
src, dst = sys.argv[2:4]
payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8"))
write_json(dst, sanitize_nested(payload))
else:
raise SystemExit(f"unsupported mode: {mode}")