Files
sub2api-cn-relay-manager/scripts/acceptance/verify_host_protocol_matrix.sh
phamnazage-jpg 492f33a129
Some checks failed
CI / Build & Test (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
CI / Release (push) Has been cancelled
feat(vnext): complete vNext.1 release gate — default chain admission, idempotent init, user key skeleton
- DEFAULT_CHAIN_ADMISSION.md: reviewed and approved, real artifact refs added
- DEFAULT_DATA_IDEMPOTENT_RELEASE_GATE.md: reviewed and approved
- scripts/setup_default_data.sh: idempotent init with --dry-run/--apply/artifact
- scripts/test/test_default_data.sh: 4 test cases all pass
- scripts/acceptance/verify_user_key_self_service.sh: Phase 0 skeleton
- .gitignore: add generated artifact directories
2026-06-05 11:07:50 +08:00

335 lines
12 KiB
Bash

#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
TIMESTAMP="$(date +%Y%m%d_%H%M%S)"
ARTIFACT_DIR="${ARTIFACT_DIR:-$ROOT_DIR/artifacts/host-capability/$TIMESTAMP}"
DRY_RUN="${DRY_RUN:-0}"
usage() {
cat <<'EOF'
Usage: verify_host_protocol_matrix.sh
Required env:
PROTOCOL_MATRIX_TARGETS_JSON JSON array of probe targets
Optional env:
ARTIFACT_DIR output directory
DRY_RUN=1 emit scaffold summary without network calls
Example:
DRY_RUN=1 \
PROTOCOL_MATRIX_TARGETS_JSON='[{"provider_id":"kimi-a7m","base_url":"https://kimi.example.com/v1","api_key_env":"KIMI_API_KEY","models":["kimi-k2.6"]}]' \
bash ./scripts/acceptance/verify_host_protocol_matrix.sh
EOF
}
require_var() {
local name="$1"
if [[ -z "${!name:-}" ]]; then
echo "missing required env: $name" >&2
exit 1
fi
}
if [[ "${1:-}" == "--help" ]]; then
usage
exit 0
fi
require_var PROTOCOL_MATRIX_TARGETS_JSON
mkdir -p "$ARTIFACT_DIR"
export ROOT_DIR ARTIFACT_DIR DRY_RUN PROTOCOL_MATRIX_TARGETS_JSON
if [[ "$DRY_RUN" == "1" ]]; then
python3 > "$ARTIFACT_DIR/protocol-matrix-summary.json" <<'PY'
import json, os
targets = json.loads(os.environ["PROTOCOL_MATRIX_TARGETS_JSON"])
summary = {"mode": "dry_run", "targets": []}
for target in targets:
summary["targets"].append({
"provider_id": str(target.get("provider_id", "")).strip(),
"base_url": str(target.get("base_url", "")).strip(),
"models": target.get("models", []),
"probe_layer": str(target.get("probe_layer", "upstream")).strip() or "upstream",
"support_level": "dry_run",
})
print(json.dumps(summary, ensure_ascii=False, indent=2))
PY
echo "protocol matrix summary: $ARTIFACT_DIR/protocol-matrix-summary.json"
exit 0
fi
python3 - <<'PY'
import json
import os
import pathlib
import shutil
import subprocess
import sys
import time
artifact_dir = pathlib.Path(os.environ["ARTIFACT_DIR"])
script_dir = artifact_dir / "targets"
script_dir.mkdir(parents=True, exist_ok=True)
targets = json.loads(os.environ["PROTOCOL_MATRIX_TARGETS_JSON"])
CONNECT_TIMEOUT = 10
MAX_TIME = 30
RETRY = 1
RETRY_DELAY = 2
def sanitize_header_value(value: str) -> str:
if value.lower().startswith("authorization:"):
return "Authorization: Bearer ***"
return value
def read_status(headers_path: pathlib.Path) -> int:
if not headers_path.exists():
return 0
for line in headers_path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if line.startswith("HTTP/"):
parts = line.split()
if len(parts) >= 2 and parts[1].isdigit():
return int(parts[1])
return 0
def read_content_type(headers_path: pathlib.Path) -> str:
if not headers_path.exists():
return ""
for line in headers_path.read_text(encoding="utf-8", errors="replace").splitlines():
if ":" not in line:
continue
k, v = line.split(":", 1)
if k.strip().lower() == "content-type":
return v.strip()
return ""
def body_json(path: pathlib.Path):
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def body_text(path: pathlib.Path) -> str:
if not path.exists():
return ""
return path.read_text(encoding="utf-8", errors="replace")
def has_smoke_model(path: pathlib.Path, model: str) -> bool:
obj = body_json(path)
if not isinstance(obj, dict):
return False
for item in obj.get("data", []):
if str(item.get("id", "")).strip() == model:
return True
return False
def classify_endpoint(status: int, body: str, endpoint: str, probe_layer: str) -> str:
text = (body or "").lower()
if 200 <= status < 300:
if endpoint == "models":
return "chat_ok"
return "chat_ok"
if status == 429:
return "rate_limited"
if status in (401, 403) and ("auth" in text or "invalid" in text or "unauthorized" in text):
return "auth_failed"
if status == 403 and "region" in text:
return "region_blocked"
if "1010" in text or "cloudflare" in text:
return "cloudflare_blocked"
if endpoint == "chat" and probe_layer == "user-key" and ("group" in text or "binding" in text or "assigned" in text):
return "user_key_binding_failed"
if endpoint == "chat" and status and status not in (401, 403, 429):
return "host_protocol_mismatch"
return "unknown_error"
def run_capture(url: str, api_key: str, method: str, request_headers_path: pathlib.Path, response_headers_path: pathlib.Path, response_body_path: pathlib.Path, payload=None):
request_headers_path.write_text(
"Authorization: Bearer ***\n"
+ ("Content-Type: application/json\n" if method == "POST" else ""),
encoding="utf-8",
)
response_headers_path.parent.mkdir(parents=True, exist_ok=True)
response_headers_path.write_text("", encoding="utf-8")
response_body_path.write_text("", encoding="utf-8")
cmd = [
"curl",
"-sS",
"-D",
str(response_headers_path),
"-o",
str(response_body_path),
"--connect-timeout",
str(CONNECT_TIMEOUT),
"--max-time",
str(MAX_TIME),
"--retry",
str(RETRY),
"--retry-delay",
str(RETRY_DELAY),
"-H",
"Authorization: Bearer ***",
"-H",
f"X-Hermes-Debug-Request-Headers: {request_headers_path}",
]
if method == "POST":
cmd += ["-H", "Content-Type: application/json", url, "-d", json.dumps(payload, ensure_ascii=False)]
else:
cmd += [url]
proc = subprocess.run(cmd, capture_output=True, text=True)
return {
"exit_code": proc.returncode,
"stderr": proc.stderr or "",
"stdout": proc.stdout or "",
}
summary = {"mode": "live_probe", "targets": []}
script_error = False
for index, target in enumerate(targets, start=1):
provider_id = str(target.get("provider_id", "")).strip()
base_url = str(target.get("base_url", "")).rstrip("/")
api_key_env = str(target.get("api_key_env", "")).strip()
probe_layer = str(target.get("probe_layer", "upstream")).strip() or "upstream"
models = [str(m).strip() for m in target.get("models", []) if str(m).strip()]
if not provider_id:
print("provider_id is required in PROTOCOL_MATRIX_TARGETS_JSON", file=sys.stderr)
script_error = True
break
if not base_url:
print(f"base_url is required for {provider_id}", file=sys.stderr)
script_error = True
break
if not api_key_env:
print(f"api_key_env is required for {provider_id}", file=sys.stderr)
script_error = True
break
api_key = os.environ.get(api_key_env, "").strip()
if not api_key:
print(f"missing required env from target.api_key_env: {api_key_env}", file=sys.stderr)
script_error = True
break
smoke_model = models[0] if models else "ping"
target_dir = script_dir / f"{index:02d}-{provider_id}"
target_dir.mkdir(parents=True, exist_ok=True)
endpoints = [
("models", "GET", f"{base_url}/models", None, "01-models"),
("chat", "POST", f"{base_url}/chat/completions", {"model": smoke_model, "messages": [{"role": "user", "content": "ping"}], "max_tokens": 8, "temperature": 0}, "02-chat"),
("responses", "POST", f"{base_url}/responses", {"model": smoke_model, "input": "ping"}, "03-responses"),
]
endpoint_results = {}
target_failed = False
target_error_code = ""
for endpoint_name, method, url, payload, prefix in endpoints:
request_headers_path = target_dir / f"{prefix}.request_headers.txt"
response_headers_path = target_dir / f"{prefix}.response_headers.txt"
response_body_path = target_dir / f"{prefix}.response_body.json"
result = run_capture(url, api_key, method, request_headers_path, response_headers_path, response_body_path, payload)
status = read_status(response_headers_path)
body = body_text(response_body_path)
error_code = ""
if result["exit_code"] == 28:
error_code = "network_timeout"
target_failed = True
elif result["exit_code"] != 0:
error_code = "unknown_error"
target_failed = True
elif not (200 <= status < 300):
error_code = classify_endpoint(status, body, endpoint_name, probe_layer)
if endpoint_name == "models":
target_failed = True
elif endpoint_name == "chat" and error_code not in ("responses_unsupported",):
target_failed = True
endpoint_results[endpoint_name] = {
"status": status,
"content_type": read_content_type(response_headers_path),
"body": body,
"error_code": error_code,
"exit_code": result["exit_code"],
"path_headers": str(response_headers_path),
"path_body": str(response_body_path),
}
if result["exit_code"] == 28 and not target_error_code:
target_error_code = "network_timeout"
models_status = endpoint_results["models"]["status"]
chat_status = endpoint_results["chat"]["status"]
responses_status = endpoint_results["responses"]["status"]
chat_ok = 200 <= chat_status < 300
responses_ok = 200 <= responses_status < 300
models_ok = 200 <= models_status < 300
models_body_path = target_dir / "01-models.response_body.json"
advisories = []
status = "ok"
support_level = "unsupported-by-host"
summary_error_code = target_error_code
if target_failed:
status = "failed"
if not summary_error_code:
summary_error_code = endpoint_results["chat"]["error_code"] or endpoint_results["models"]["error_code"] or endpoint_results["responses"]["error_code"] or "unknown_error"
else:
if chat_ok and responses_ok:
support_level = "supported-direct"
summary_error_code = "chat_ok"
elif chat_ok and not responses_ok:
advisories.append("responses_unsupported_but_chat_ok")
support_level = "supported-with-plugin-adapter"
summary_error_code = "responses_unsupported"
elif models_ok and not chat_ok:
support_level = "upstream-unhealthy"
summary_error_code = endpoint_results["chat"]["error_code"] or "models_only"
else:
support_level = "unsupported-by-host"
summary_error_code = endpoint_results["chat"]["error_code"] or endpoint_results["responses"]["error_code"] or "unknown_error"
status = "failed"
summary["targets"].append({
"provider_id": provider_id,
"base_url": base_url,
"probe_layer": probe_layer,
"models": models,
"smoke_model": smoke_model,
"status": status,
"error_code": summary_error_code,
"models_status": models_status,
"chat_status": chat_status,
"responses_status": responses_status,
"models_has_smoke_model": has_smoke_model(models_body_path, smoke_model),
"chat_content_type": endpoint_results["chat"]["content_type"],
"responses_content_type": endpoint_results["responses"]["content_type"],
"support_level": support_level,
"known_advisories": advisories,
"artifact_dir": str(target_dir),
})
(artifact_dir / "protocol-matrix-summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(summary, ensure_ascii=False, indent=2))
if script_error:
sys.exit(1)
PY
echo "protocol matrix summary: $ARTIFACT_DIR/protocol-matrix-summary.json"