#!/usr/bin/env python3 import hashlib import json import pathlib import sys from typing import Any KEY_FIELD_NAMES = { "api_key", "requested_probe_api_key", "raw_key", "subscription_user_key", "managed_probe_key", } PREFIX_FIELD_NAMES = { "gateway_key_prefix", "managed_key_prefix", "managed_probe_key_prefix", "subscription_user_key_prefix", "managed_key_preview", } IDENTIFIER_FIELD_NAMES = { "subscription_user_id", "raw_user_id", "managed_user_id", "admin_user_id", } EMAIL_FIELD_NAMES = { "managed_user_email", } JSON_STRING_FIELD_NAMES = { "DetailsJSON", "details_json", "probe_summary_json", } def redact_key(value: str) -> dict[str, Any]: value = (value or "").strip() if not value: return { "present": False, "prefix": "", "suffix": "", "fingerprint": "", } return { "present": True, "prefix": value[:4], "suffix": value[-4:] if len(value) >= 4 else value, "fingerprint": hashlib.sha256(value.encode("utf-8")).hexdigest(), } def redact_identifier(value: str) -> str: value = (value or "").strip() if not value: return "" return hashlib.sha256(value.encode("utf-8")).hexdigest() def sanitize_headers(raw: str) -> str: lines = [] for line in (raw or "").splitlines(): lower = line.lower() if lower.startswith("authorization:"): continue if lower.startswith("cookie:"): continue if lower.startswith("set-cookie:"): continue if lower.startswith("x-api-key:"): continue lines.append(line) return "\n".join(lines) + ("\n" if lines else "") def sanitize_group_state(payload: Any) -> dict[str, Any]: if not isinstance(payload, dict): return {} group = payload.get("group") if isinstance(payload.get("group"), dict) else {} subscription = payload.get("subscription") if isinstance(payload.get("subscription"), dict) else {} key = payload.get("key") if isinstance(payload.get("key"), dict) else {} key_value = str(key.get("key") or "") return { "group_id": payload.get("group_id"), "group": { "id": group.get("id"), "name": group.get("name"), "type": group.get("type"), "subscription_type": group.get("subscription_type"), }, "subscription": { "id": subscription.get("id"), "user_id_hash": redact_identifier(str(subscription.get("user_id") or "")), "group_id": subscription.get("group_id"), "status": subscription.get("status"), "starts_at": subscription.get("starts_at"), "expires_at": subscription.get("expires_at"), }, "key": { "id": key.get("id"), "group_id": key.get("group_id"), "status": key.get("status"), "redacted": redact_key(key_value), }, } def sanitize_runtime_context(payload: Any) -> dict[str, Any]: if not isinstance(payload, dict): return {} out: dict[str, Any] = { "crm_base": payload.get("crm_base"), "host_base": payload.get("host_base"), "crm_host_base": payload.get("crm_host_base"), "remote_host_base": payload.get("remote_host_base"), "provider_id": payload.get("provider_id"), "subscription_group_id": payload.get("subscription_group_id"), "import_group_id": payload.get("import_group_id"), } if "subscription_user_id" in payload: out["subscription_user_id_hash"] = redact_identifier(str(payload.get("subscription_user_id") or "")) if "managed_user_id" in payload: out["managed_user_id_hash"] = redact_identifier(str(payload.get("managed_user_id") or "")) if "admin_user_id" in payload: out["admin_user_id_hash"] = redact_identifier(str(payload.get("admin_user_id") or "")) if "managed_user_email" in payload: out["managed_user_email_hash"] = redact_identifier(str(payload.get("managed_user_email") or "")) if "subscription_user_key_prefix" in payload or "subscription_user_key" in payload: source = str(payload.get("subscription_user_key") or payload.get("subscription_user_key_prefix") or "") out["subscription_user_key"] = redact_key(source) if "managed_probe_key_prefix" in payload or "managed_probe_key" in payload: source = str(payload.get("managed_probe_key") or payload.get("managed_probe_key_prefix") or "") out["managed_probe_key"] = redact_key(source) return out def sanitize_nested(value: Any) -> Any: if isinstance(value, dict): out: dict[str, Any] = {} for key, item in value.items(): if key in KEY_FIELD_NAMES: out[key] = redact_key(str(item or "")) continue if key in PREFIX_FIELD_NAMES: out[key] = redact_key(str(item or "")) continue if key in IDENTIFIER_FIELD_NAMES: out[f"{key}_hash"] = redact_identifier(str(item or "")) continue if key in EMAIL_FIELD_NAMES: out[f"{key}_hash"] = redact_identifier(str(item or "")) continue if key in JSON_STRING_FIELD_NAMES and isinstance(item, str): try: parsed = json.loads(item) except Exception: out[key] = item else: out[key] = json.dumps(sanitize_nested(parsed), ensure_ascii=False) continue out[key] = sanitize_nested(item) return out if isinstance(value, list): return [sanitize_nested(item) for item in value] return value def write_json(path: str, payload: Any) -> None: pathlib.Path(path).write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") if __name__ == "__main__": mode = sys.argv[1] if mode == "redact-key": print(json.dumps(redact_key(sys.argv[2]), ensure_ascii=False)) elif mode == "redact-id": print(redact_identifier(sys.argv[2])) elif mode == "sanitize-headers": src, dst = sys.argv[2:4] payload = pathlib.Path(src).read_text(encoding="utf-8") pathlib.Path(dst).write_text(sanitize_headers(payload), encoding="utf-8") elif mode == "sanitize-group-state": src, dst = sys.argv[2:4] payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8")) write_json(dst, sanitize_group_state(payload)) elif mode == "sanitize-runtime-context": src, dst = sys.argv[2:4] payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8")) write_json(dst, sanitize_runtime_context(payload)) elif mode == "sanitize-json": src, dst = sys.argv[2:4] payload = json.loads(pathlib.Path(src).read_text(encoding="utf-8")) write_json(dst, sanitize_nested(payload)) else: raise SystemExit(f"unsupported mode: {mode}")