feat(runtime): harden daily pipeline audit and verification

Tighten real-ingestion success rules, separate scheduled reports from historical rebuilds, and persist source-level runtime audit across daily pipeline runs.

Also add the Phase 5 CI workflow contract plus verification updates and supporting docs so the full uncommitted change set can be validated together.
This commit is contained in:
phamnazage-jpg
2026-05-14 16:17:39 +08:00
parent 618dff33da
commit a8999abcb0
17 changed files with 880 additions and 45 deletions

View File

@@ -64,11 +64,14 @@ type sourceDefinition struct {
}
type runSummary struct {
SelectedSources int
SuccessfulSources int
TotalModels int
DomesticModels int
CurrencyCounts map[string]int
SelectedSources int
SelectedSourceKeys []string
SuccessfulSources int
SuccessfulSourceKeys []string
FailedSourceKeys []string
TotalModels int
DomesticModels int
CurrencyCounts map[string]int
}
type pricingMetadataFields struct {
@@ -256,12 +259,15 @@ func listSourceKeys(apiKey string) []string {
return keys
}
func summarizePrices(selectedSources int, successfulSources int, prices []ModelPricing) runSummary {
func summarizePrices(selectedSourceKeys []string, successfulSourceKeys []string, failedSourceKeys []string, prices []ModelPricing) runSummary {
summary := runSummary{
SelectedSources: selectedSources,
SuccessfulSources: successfulSources,
TotalModels: len(prices),
CurrencyCounts: make(map[string]int),
SelectedSources: len(selectedSourceKeys),
SelectedSourceKeys: append([]string(nil), selectedSourceKeys...),
SuccessfulSources: len(successfulSourceKeys),
SuccessfulSourceKeys: append([]string(nil), successfulSourceKeys...),
FailedSourceKeys: append([]string(nil), failedSourceKeys...),
TotalModels: len(prices),
CurrencyCounts: make(map[string]int),
}
for _, price := range prices {
if strings.EqualFold(price.ProviderCountry, "CN") {
@@ -272,6 +278,21 @@ func summarizePrices(selectedSources int, successfulSources int, prices []ModelP
return summary
}
func sourceKey(src DataSource) string {
switch strings.ToLower(strings.TrimSpace(src.Name())) {
case "openrouter":
return "openrouter"
case "moonshot":
return "moonshot"
case "deepseek":
return "deepseek"
case "openai":
return "openai"
default:
return strings.ToLower(strings.ReplaceAll(strings.TrimSpace(src.Name()), " ", "_"))
}
}
func formatCountMap(counts map[string]int) string {
if len(counts) == 0 {
return "none"
@@ -289,17 +310,27 @@ func formatCountMap(counts map[string]int) string {
return strings.Join(parts, ",")
}
func formatKeyList(keys []string) string {
if len(keys) == 0 {
return "none"
}
return strings.Join(keys, ",")
}
func printSummary(w io.Writer, summary runSummary) error {
if w == nil {
return nil
}
_, err := fmt.Fprintf(
w,
"sources=%d successful_sources=%d models=%d domestic_models=%d currencies=%s\n",
"sources=%d successful_sources=%d models=%d domestic_models=%d selected_source_keys=%s successful_source_keys=%s failed_source_keys=%s currencies=%s\n",
summary.SelectedSources,
summary.SuccessfulSources,
summary.TotalModels,
summary.DomesticModels,
formatKeyList(summary.SelectedSourceKeys),
formatKeyList(summary.SuccessfulSourceKeys),
formatKeyList(summary.FailedSourceKeys),
formatCountMap(summary.CurrencyCounts),
)
return err
@@ -564,23 +595,29 @@ func defaultDSN() string {
func runCollector(cfg runConfig, sources []DataSource, saveFn func([]ModelPricing) error, out io.Writer) error {
allPrices := make([]ModelPricing, 0)
successfulSources := 0
selectedSourceKeys := make([]string, 0, len(sources))
successfulSourceKeys := make([]string, 0, len(sources))
failedSourceKeys := make([]string, 0)
for _, src := range sources {
key := sourceKey(src)
selectedSourceKeys = append(selectedSourceKeys, key)
prices, err := src.FetchPricing()
if err != nil {
logger.Error("采集失败", "source", src.Name(), "error", err)
failedSourceKeys = append(failedSourceKeys, key)
continue
}
successfulSources++
successfulSourceKeys = append(successfulSourceKeys, key)
allPrices = append(allPrices, prices...)
}
summary := summarizePrices(len(sources), successfulSources, allPrices)
summary := summarizePrices(selectedSourceKeys, successfulSourceKeys, failedSourceKeys, allPrices)
if err := printSummary(out, summary); err != nil {
return err
}
if successfulSources == 0 {
if summary.SuccessfulSources == 0 {
return fmt.Errorf("no data source collected successfully")
}
if cfg.DryRun {
@@ -593,7 +630,7 @@ func runCollector(cfg runConfig, sources []DataSource, saveFn func([]ModelPricin
return err
}
logger.Info("多源采集完成", "total_models", len(allPrices), "sources", successfulSources)
logger.Info("多源采集完成", "total_models", len(allPrices), "sources", summary.SuccessfulSources)
return nil
}

View File

@@ -90,6 +90,49 @@ func TestRunCollectorDryRunSkipsDatabaseWrite(t *testing.T) {
if !bytes.Contains(out.Bytes(), []byte("currencies=CNY:2,USD:1")) {
t.Fatalf("expected currency summary, got %q", output)
}
if !bytes.Contains(out.Bytes(), []byte("selected_source_keys=moonshot,openai")) {
t.Fatalf("expected selected source keys in summary, got %q", output)
}
if !bytes.Contains(out.Bytes(), []byte("successful_source_keys=moonshot,openai")) {
t.Fatalf("expected successful source keys in summary, got %q", output)
}
if !bytes.Contains(out.Bytes(), []byte("failed_source_keys=none")) {
t.Fatalf("expected failed source keys in summary, got %q", output)
}
}
func TestRunCollectorReportsFailedSourceKeys(t *testing.T) {
cfg := runConfig{DryRun: true}
var out bytes.Buffer
err := runCollector(
cfg,
[]DataSource{
fakeSource{
name: "Moonshot",
prices: []ModelPricing{
{ModelID: "kimi-k2.6", ProviderCountry: "CN", Currency: "CNY"},
},
},
fakeSource{
name: "OpenAI",
err: bytes.ErrTooLarge,
},
},
nil,
&out,
)
if err != nil {
t.Fatalf("runCollector returned error: %v", err)
}
output := out.String()
if !bytes.Contains(out.Bytes(), []byte("successful_source_keys=moonshot")) {
t.Fatalf("expected successful source keys in summary, got %q", output)
}
if !bytes.Contains(out.Bytes(), []byte("failed_source_keys=openai")) {
t.Fatalf("expected failed source keys in summary, got %q", output)
}
}
func TestPricingMetadataClassifiesSourceType(t *testing.T) {

View File

@@ -33,6 +33,7 @@ type Config struct {
TimeoutSec int
BatchSize int
DBConn string
StrictReal bool
}
// ModelInfo 模型信息(与 collectors 包兼容)
@@ -99,6 +100,7 @@ func parseArgs() Config {
timeoutSec := flag.Int("timeout", 30, "请求超时(秒)")
batchSize := flag.Int("batch", 100, "批量插入批次大小")
dbConn := flag.String("db", os.Getenv("DATABASE_URL"), "PostgreSQL 连接字符串")
strictReal := flag.Bool("strict-real", false, "严格真实模式:缺少 API Key 或数据库写入失败时返回错误")
flag.Parse()
return Config{
APIKey: *apiKey,
@@ -108,6 +110,7 @@ func parseArgs() Config {
TimeoutSec: *timeoutSec,
BatchSize: *batchSize,
DBConn: *dbConn,
StrictReal: *strictReal,
}
}
@@ -158,6 +161,9 @@ func run(cfg Config) error {
if cfg.DBConn != "" {
if err := summarizeDB(cfg.DBConn, models, cfg.BatchSize); err != nil {
logger.Error("PostgreSQL 写入失败", "error", err)
if cfg.StrictReal {
return fmt.Errorf("PostgreSQL 写入失败: %w", err)
}
logger.Warn("降级为仅写入 JSON")
} else {
logger.Info("PostgreSQL 写入完成", "records", len(models))
@@ -169,6 +175,9 @@ func run(cfg Config) error {
// fetchModels 抓取 OpenRouter 模型列表(集成指数退避重试)
func fetchModels(cfg Config) ([]ModelInfo, error) {
if cfg.APIKey == "" {
if cfg.StrictReal {
return nil, fmt.Errorf("严格真实模式下必须提供 API Key")
}
logger.Warn("未提供 API Key使用模拟数据")
return []ModelInfo{
{ID: "openai/gpt-4o", ContextLength: 128000, Pricing: ModelPricing{Input: 2.5, Output: 10.0}},

View File

@@ -4,6 +4,8 @@ package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
@@ -98,3 +100,33 @@ func TestRunNoAPIKey(t *testing.T) {
t.Error("models 为空")
}
}
func TestFetchModelsFailsInStrictRealModeWithoutAPIKey(t *testing.T) {
_, err := fetchModels(Config{StrictReal: true})
if err == nil {
t.Fatal("strict real mode should fail without API key")
}
}
func TestRunFailsInStrictRealModeWhenDBWriteFails(t *testing.T) {
tmpDir := t.TempDir()
outPath := filepath.Join(tmpDir, "models.json")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"data":[{"id":"openai/gpt-4o","name":"GPT-4o","context_length":128000,"pricing":{"input":2.5,"output":10.0}}]}`))
}))
defer server.Close()
err := run(Config{
APIKey: "test-key",
APIURL: server.URL,
OutPath: outPath,
DBConn: "postgres://invalid@127.0.0.1:1/invalid?sslmode=disable",
BatchSize: 10,
TimeoutSec: 1,
StrictReal: true,
})
if err == nil {
t.Fatal("strict real mode should fail when database write fails")
}
}

View File

@@ -22,6 +22,13 @@ import (
var logger *slog.Logger
type ReportRunContext struct {
RunKind string
TriggerSource string
IsOfficialDaily bool
RuntimeAudit string
}
func init() {
logger = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
}
@@ -89,6 +96,14 @@ func run() error {
if err != nil {
return err
}
runContext := resolveReportRunContext(
date,
time.Now(),
os.Getenv("REPORT_RUN_KIND"),
os.Getenv("REPORT_TRIGGER_SOURCE"),
os.Getenv("REPORT_IS_OFFICIAL_DAILY"),
os.Getenv("REPORT_RUNTIME_AUDIT"),
)
// 1. 获取报告数据(使用新schema)
report, err := generateReportDataV3(db, date)
@@ -122,7 +137,7 @@ func run() error {
}
// 6. 同步写入日报状态与运行轨迹
if err := saveReportTrackingV3(db, report, mdPath); err != nil {
if err := saveReportTrackingV3(db, report, mdPath, runContext); err != nil {
logger.Warn("保存日报记录失败", "error", err)
}
@@ -165,6 +180,43 @@ func resolveReportDate(now time.Time, args []string, envDate string) (string, er
return parsed.Format("2006-01-02"), nil
}
func resolveReportRunContext(reportDate string, now time.Time, envRunKind, envTriggerSource, envOfficialDaily, envRuntimeAudit string) ReportRunContext {
runKind := strings.TrimSpace(envRunKind)
if runKind == "" {
runKind = "manual"
}
triggerSource := strings.TrimSpace(envTriggerSource)
if triggerSource == "" {
triggerSource = "cli"
}
isOfficialDaily := strings.EqualFold(strings.TrimSpace(envOfficialDaily), "true")
if strings.TrimSpace(envOfficialDaily) == "" && reportDate == now.Format("2006-01-02") && runKind == "scheduled" {
isOfficialDaily = true
}
return ReportRunContext{
RunKind: runKind,
TriggerSource: triggerSource,
IsOfficialDaily: isOfficialDaily,
RuntimeAudit: strings.TrimSpace(envRuntimeAudit),
}
}
func composeTrackedSummary(summary string, runContext ReportRunContext) string {
runtimeAudit := strings.TrimSpace(runContext.RuntimeAudit)
summary = strings.TrimSpace(summary)
if runtimeAudit == "" {
return summary
}
if summary == "" {
return runtimeAudit
}
return runtimeAudit + "\n" + summary
}
// ============ 数据模型 ============
const (
@@ -2869,11 +2921,12 @@ th {
return t.Execute(f, r)
}
func saveReportTrackingV3(db *sql.DB, r *ReportV3, mdPath string) error {
func saveReportTrackingV3(db *sql.DB, r *ReportV3, mdPath string, runContext ReportRunContext) error {
summary := r.HeroSummary
if summary == "" {
summary = fmt.Sprintf("models=%d free=%d intl=%d domestic=%d", r.TotalModels, len(r.FreeModels), len(r.IntlTop5), len(r.DomesticTop10))
}
summary = composeTrackedSummary(summary, runContext)
tx, err := db.Begin()
if err != nil {
return err
@@ -2881,24 +2934,39 @@ func saveReportTrackingV3(db *sql.DB, r *ReportV3, mdPath string) error {
defer tx.Rollback()
if _, err := tx.Exec(`
INSERT INTO daily_report (report_date, status, model_count, new_models, free_models, summary_md, output_path, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
INSERT INTO daily_report (report_date, status, model_count, new_models, free_models, summary_md, output_path, run_kind, trigger_source, is_official_daily, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
ON CONFLICT (report_date) DO UPDATE SET
status = EXCLUDED.status,
model_count = EXCLUDED.model_count,
free_models = EXCLUDED.free_models,
summary_md = EXCLUDED.summary_md,
output_path = EXCLUDED.output_path,
run_kind = CASE
WHEN EXCLUDED.is_official_daily THEN EXCLUDED.run_kind
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.run_kind
ELSE daily_report.run_kind
END,
trigger_source = CASE
WHEN EXCLUDED.is_official_daily THEN EXCLUDED.trigger_source
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.trigger_source
ELSE daily_report.trigger_source
END,
is_official_daily = CASE
WHEN EXCLUDED.is_official_daily THEN TRUE
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.is_official_daily
ELSE daily_report.is_official_daily
END,
error_message = NULL,
updated_at = NOW()
`, r.Date, "generated", r.TotalModels, 0, len(r.FreeModels), summary, mdPath); err != nil {
`, r.Date, "generated", r.TotalModels, 0, len(r.FreeModels), summary, mdPath, runContext.RunKind, runContext.TriggerSource, runContext.IsOfficialDaily); err != nil {
return err
}
if _, err := tx.Exec(`
INSERT INTO report_runs (source, report_date, status, summary_md, output_path, error_message)
VALUES ($1, $2, $3, $4, $5, NULL)
`, "generate_daily_report", r.Date, "generated", summary, mdPath); err != nil {
INSERT INTO report_runs (source, report_date, status, summary_md, output_path, error_message, run_kind, trigger_source, is_official_daily)
VALUES ($1, $2, $3, $4, $5, NULL, $6, $7, $8)
`, "generate_daily_report", r.Date, "generated", summary, mdPath, runContext.RunKind, runContext.TriggerSource, runContext.IsOfficialDaily); err != nil {
return err
}

View File

@@ -227,6 +227,61 @@ func TestResolveReportDateRejectsInvalidDate(t *testing.T) {
}
}
func TestResolveReportRunContextDefaultsToManualCLI(t *testing.T) {
ctx := resolveReportRunContext("2026-05-14", time.Date(2026, 5, 14, 8, 0, 0, 0, time.FixedZone("CST", 8*3600)), "", "", "", "")
if ctx.RunKind != "manual" {
t.Fatalf("run kind = %q, want manual", ctx.RunKind)
}
if ctx.TriggerSource != "cli" {
t.Fatalf("trigger source = %q, want cli", ctx.TriggerSource)
}
if ctx.IsOfficialDaily {
t.Fatalf("manual run should not be official daily: %+v", ctx)
}
}
func TestResolveReportRunContextHonorsScheduledEnv(t *testing.T) {
ctx := resolveReportRunContext("2026-05-14", time.Date(2026, 5, 14, 8, 0, 0, 0, time.FixedZone("CST", 8*3600)), "scheduled", "cron", "true", "")
if ctx.RunKind != "scheduled" || ctx.TriggerSource != "cron" || !ctx.IsOfficialDaily {
t.Fatalf("unexpected scheduled context: %+v", ctx)
}
}
func TestResolveReportRunContextMarksHistoricalRebuildAsNonOfficial(t *testing.T) {
ctx := resolveReportRunContext("2025-08-07", time.Date(2026, 5, 14, 8, 0, 0, 0, time.FixedZone("CST", 8*3600)), "historical_rebuild", "rebuild_script", "false", "")
if ctx.RunKind != "historical_rebuild" {
t.Fatalf("run kind = %q, want historical_rebuild", ctx.RunKind)
}
if ctx.TriggerSource != "rebuild_script" {
t.Fatalf("trigger source = %q, want rebuild_script", ctx.TriggerSource)
}
if ctx.IsOfficialDaily {
t.Fatalf("historical rebuild should not be official daily: %+v", ctx)
}
}
func TestComposeTrackedSummaryPrependsRuntimeAudit(t *testing.T) {
summary := composeTrackedSummary(
"models=42 free=3 intl=5 domestic=10",
ReportRunContext{
RunKind: "scheduled",
TriggerSource: "cron",
IsOfficialDaily: true,
RuntimeAudit: "runtime_audit stage_set=openrouter,multi_source,official_imports,daily_report selected_source_keys=openrouter,moonshot,deepseek,openai,zhipu,baidu,bytedance failed_source_keys=none",
},
)
if !strings.Contains(summary, "runtime_audit stage_set=openrouter,multi_source,official_imports,daily_report") {
t.Fatalf("expected runtime audit in tracked summary, got %q", summary)
}
if !strings.Contains(summary, "failed_source_keys=none") {
t.Fatalf("expected failed source keys in tracked summary, got %q", summary)
}
if !strings.Contains(summary, "models=42 free=3 intl=5 domestic=10") {
t.Fatalf("expected report summary to be preserved, got %q", summary)
}
}
func TestDecorateReportV1BuildsHotDaySummary(t *testing.T) {
report := sampleReportForV1()
report.ModelEvents = []ModelEvent{

View File

@@ -21,4 +21,8 @@ if [[ -f ".env" ]]; then
source ".env"
fi
REPORT_DATE="$REPORT_DATE" go run -tags llm_script ./scripts/generate_daily_report.go "$@"
REPORT_DATE="$REPORT_DATE" \
REPORT_RUN_KIND="historical_rebuild" \
REPORT_TRIGGER_SOURCE="rebuild_script" \
REPORT_IS_OFFICIAL_DAILY="false" \
go run -tags llm_script ./scripts/generate_daily_report.go "$@"

View File

@@ -55,7 +55,7 @@ archive_report_artifacts() {
}
track_report_state() {
local db_url report_date status model_count summary_md output_path error_message
local db_url report_date status model_count summary_md output_path error_message run_kind trigger_source is_official_daily
db_url="$1"
report_date="$2"
status="$3"
@@ -63,6 +63,9 @@ track_report_state() {
summary_md="${5:-}"
output_path="${6:-}"
error_message="${7:-}"
run_kind="${8:-manual}"
trigger_source="${9:-cli}"
is_official_daily="${10:-false}"
psql "$db_url" \
-v ON_ERROR_STOP=1 \
@@ -71,7 +74,10 @@ track_report_state() {
--set=model_count="$model_count" \
--set=summary_md="$summary_md" \
--set=output_path="$output_path" \
--set=error_message="$error_message" <<'SQL'
--set=error_message="$error_message" \
--set=run_kind="$run_kind" \
--set=trigger_source="$trigger_source" \
--set=is_official_daily="$is_official_daily" <<'SQL'
INSERT INTO daily_report (
report_date,
status,
@@ -79,6 +85,9 @@ INSERT INTO daily_report (
summary_md,
output_path,
error_message,
run_kind,
trigger_source,
is_official_daily,
created_at,
updated_at
)
@@ -89,6 +98,9 @@ VALUES (
NULLIF(:'summary_md', ''),
NULLIF(:'output_path', ''),
NULLIF(:'error_message', ''),
NULLIF(:'run_kind', ''),
NULLIF(:'trigger_source', ''),
NULLIF(:'is_official_daily', '')::BOOLEAN,
NOW(),
NOW()
)
@@ -98,6 +110,21 @@ ON CONFLICT (report_date) DO UPDATE SET
summary_md = COALESCE(EXCLUDED.summary_md, daily_report.summary_md),
output_path = COALESCE(EXCLUDED.output_path, daily_report.output_path),
error_message = EXCLUDED.error_message,
run_kind = CASE
WHEN EXCLUDED.is_official_daily THEN EXCLUDED.run_kind
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.run_kind
ELSE daily_report.run_kind
END,
trigger_source = CASE
WHEN EXCLUDED.is_official_daily THEN EXCLUDED.trigger_source
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.trigger_source
ELSE daily_report.trigger_source
END,
is_official_daily = CASE
WHEN EXCLUDED.is_official_daily THEN TRUE
WHEN daily_report.trigger_source = 'legacy_backfill' THEN EXCLUDED.is_official_daily
ELSE daily_report.is_official_daily
END,
updated_at = NOW();
INSERT INTO report_runs (
@@ -106,7 +133,10 @@ INSERT INTO report_runs (
status,
summary_md,
output_path,
error_message
error_message,
run_kind,
trigger_source,
is_official_daily
)
VALUES (
'pipeline',
@@ -114,7 +144,10 @@ VALUES (
:'status',
NULLIF(:'summary_md', ''),
NULLIF(:'output_path', ''),
NULLIF(:'error_message', '')
NULLIF(:'error_message', ''),
NULLIF(:'run_kind', ''),
NULLIF(:'trigger_source', ''),
NULLIF(:'is_official_daily', '')::BOOLEAN
);
SQL
}

View File

@@ -5,27 +5,72 @@ set -euo pipefail
PROJECT_DIR="/home/long/project/llm-intelligence"
. "$PROJECT_DIR/scripts/report_utils.sh"
if [[ -f "$PROJECT_DIR/.env.local" ]]; then
# shellcheck disable=SC1091
source "$PROJECT_DIR/.env.local"
fi
if [[ -f "$PROJECT_DIR/.env" ]]; then
# shellcheck disable=SC1091
source "$PROJECT_DIR/.env"
fi
DB_URL="${DATABASE_URL:-host=/var/run/postgresql dbname=llm_intelligence user=long sslmode=disable}"
REPORT_DATE="$(report_date_value)"
LOG_FILE="/tmp/llm_hub_daily_${REPORT_DATE}.log"
FEISHU_WEBHOOK="${FEISHU_WEBHOOK:-}"
MODEL_COUNT=""
FETCH_OUT="${PROJECT_DIR}/models.json"
FETCH_TOTAL="0"
PIPELINE_STAGE_SET="openrouter,multi_source,official_imports,daily_report"
PIPELINE_SOURCE_SET="openrouter,moonshot,deepseek,openai,zhipu,baidu,bytedance"
PIPELINE_FAILED_SOURCE_SET="none"
MULTI_SOURCE_AUDIT="multi_source_audit=unavailable"
PIPELINE_AUDIT_SUMMARY=""
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
normalize_summary_file() {
local path="$1"
if [ ! -f "$path" ]; then
return
fi
tr '\n' ' ' < "$path" | sed 's/[[:space:]]\+/ /g; s/^ //; s/ $//'
}
extract_failed_source_keys() {
local summary="$1"
printf '%s\n' "$summary" | sed -n 's/.*failed_source_keys=\([^ ]*\).*/\1/p'
}
merge_failed_source_keys() {
local keys="$1"
if [ -z "$keys" ] || [ "$keys" = "none" ]; then
return
fi
if [ "$PIPELINE_FAILED_SOURCE_SET" = "none" ]; then
PIPELINE_FAILED_SOURCE_SET="$keys"
return
fi
PIPELINE_FAILED_SOURCE_SET="${PIPELINE_FAILED_SOURCE_SET},${keys}"
}
refresh_pipeline_audit() {
PIPELINE_AUDIT_SUMMARY="runtime_audit stage_set=${PIPELINE_STAGE_SET} selected_source_keys=${PIPELINE_SOURCE_SET} failed_source_keys=${PIPELINE_FAILED_SOURCE_SET} openrouter_total=${FETCH_TOTAL:-0} ${MULTI_SOURCE_AUDIT}"
}
# 错误处理
error_exit() {
local output_path=""
log "❌ 错误: $1"
refresh_pipeline_audit
# 降级:复制昨日报告
fallback_report
if [ -f "$(report_markdown_path "$REPORT_DATE")" ]; then
output_path="$(report_markdown_path "$REPORT_DATE")"
fi
track_report_state "$DB_URL" "$REPORT_DATE" "failed" "${MODEL_COUNT:-}" "" "$output_path" "$1" >> "$LOG_FILE" 2>&1 || true
track_report_state "$DB_URL" "$REPORT_DATE" "failed" "${MODEL_COUNT:-}" "$PIPELINE_AUDIT_SUMMARY" "$output_path" "$1" "scheduled" "cron" "true" >> "$LOG_FILE" 2>&1 || true
# 发送告警
if [ -n "$FEISHU_WEBHOOK" ]; then
send_alert "$1"
@@ -33,6 +78,8 @@ error_exit() {
exit 1
}
refresh_pipeline_audit
# 降级:复制昨日报告
fallback_report() {
local yesterday yesterday_md today_md yesterday_html today_html
@@ -77,11 +124,66 @@ cd "$PROJECT_DIR"
# 1. 数据采集
log "1⃣ 数据采集..."
if ! go run scripts/fetch_openrouter.go >> "$LOG_FILE" 2>&1; then
if ! go run scripts/fetch_openrouter.go -strict-real -out "$FETCH_OUT" >> "$LOG_FILE" 2>&1; then
merge_failed_source_keys "openrouter"
error_exit "数据采集失败"
fi
FETCH_TOTAL=$(python3 - <<'PY' "$FETCH_OUT"
import json, sys
path = sys.argv[1]
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(int(data.get("total", 0)))
PY
)
if [ "${FETCH_TOTAL:-0}" -lt 10 ]; then
merge_failed_source_keys "openrouter"
error_exit "本次采集结果异常: total=${FETCH_TOTAL:-0} < 10"
fi
refresh_pipeline_audit
log "✅ 数据采集完成"
# 1.5 多源补充同步
log "1 多源补充同步..."
MULTI_SOURCE_OUTPUT="$(mktemp)"
if ! go run scripts/fetch_multi_source.go --sources moonshot,deepseek,openai > "$MULTI_SOURCE_OUTPUT" 2>> "$LOG_FILE"; then
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
if [ -n "$MULTI_SOURCE_SUMMARY" ]; then
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
else
MULTI_SOURCE_AUDIT="multi_source_audit=stage_failed"
merge_failed_source_keys "moonshot,deepseek,openai"
fi
cat "$MULTI_SOURCE_OUTPUT" >> "$LOG_FILE"
rm -f "$MULTI_SOURCE_OUTPUT"
error_exit "多源补充同步失败"
fi
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY:-none}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
refresh_pipeline_audit
cat "$MULTI_SOURCE_OUTPUT" >> "$LOG_FILE"
rm -f "$MULTI_SOURCE_OUTPUT"
if ! go run -tags llm_script scripts/import_zhipu_data.go >> "$LOG_FILE" 2>&1; then
merge_failed_source_keys "zhipu"
error_exit "智谱官方导入失败"
fi
if ! go run -tags llm_script scripts/export_official_seed_json.go >> "$LOG_FILE" 2>&1; then
merge_failed_source_keys "official_seed_export"
error_exit "官方种子导出失败"
fi
if ! go run -tags llm_script scripts/import_phase2_data.go >> "$LOG_FILE" 2>&1; then
merge_failed_source_keys "baidu"
error_exit "百度官方导入失败"
fi
if ! go run -tags llm_script scripts/import_bytedance_data.go >> "$LOG_FILE" 2>&1; then
merge_failed_source_keys "bytedance"
error_exit "字节官方导入失败"
fi
refresh_pipeline_audit
log "✅ 多源补充同步完成"
# 2. 数据质量检查
log "2⃣ 数据质量检查..."
MODEL_COUNT=$(psql "$DB_URL" -t -c "SELECT COUNT(*) FROM models WHERE deleted_at IS NULL" 2>/dev/null | tr -d ' ')
@@ -93,7 +195,7 @@ log "✅ 数据质量检查通过 (模型数: ${MODEL_COUNT})"
# 3. 生成日报
log "3⃣ 生成日报..."
export DATABASE_URL="$DB_URL"
if ! go run scripts/generate_daily_report.go >> "$LOG_FILE" 2>&1; then
if ! REPORT_RUN_KIND="scheduled" REPORT_TRIGGER_SOURCE="cron" REPORT_IS_OFFICIAL_DAILY="true" REPORT_RUNTIME_AUDIT="$PIPELINE_AUDIT_SUMMARY" go run scripts/generate_daily_report.go >> "$LOG_FILE" 2>&1; then
error_exit "日报生成失败"
fi
log "✅ 日报生成完成"

View File

@@ -25,30 +25,133 @@ if [[ -z "${OPENROUTER_API_KEY:-}" ]]; then
fi
REPORT_DATE="$(report_date_value)"
FETCH_OUT="$ROOT_DIR/models.json"
FETCH_TOTAL="0"
PIPELINE_STAGE_SET="openrouter,multi_source,official_imports,daily_report"
PIPELINE_SOURCE_SET="openrouter,moonshot,deepseek,openai,zhipu,baidu,bytedance"
PIPELINE_FAILED_SOURCE_SET="none"
MULTI_SOURCE_AUDIT="multi_source_audit=unavailable"
PIPELINE_AUDIT_SUMMARY=""
normalize_summary_file() {
local path="$1"
if [[ ! -f "$path" ]]; then
return
fi
tr '\n' ' ' < "$path" | sed 's/[[:space:]]\+/ /g; s/^ //; s/ $//'
}
extract_failed_source_keys() {
local summary="$1"
printf '%s\n' "$summary" | sed -n 's/.*failed_source_keys=\([^ ]*\).*/\1/p'
}
merge_failed_source_keys() {
local keys="$1"
if [[ -z "$keys" || "$keys" == "none" ]]; then
return
fi
if [[ "$PIPELINE_FAILED_SOURCE_SET" == "none" ]]; then
PIPELINE_FAILED_SOURCE_SET="$keys"
return
fi
PIPELINE_FAILED_SOURCE_SET="${PIPELINE_FAILED_SOURCE_SET},${keys}"
}
refresh_pipeline_audit() {
PIPELINE_AUDIT_SUMMARY="runtime_audit stage_set=${PIPELINE_STAGE_SET} selected_source_keys=${PIPELINE_SOURCE_SET} failed_source_keys=${PIPELINE_FAILED_SOURCE_SET} openrouter_total=${FETCH_TOTAL:-0} ${MULTI_SOURCE_AUDIT}"
}
record_failure() {
local error_message output_path
error_message="$1"
output_path=""
refresh_pipeline_audit
if [[ -f "$(report_markdown_path "$REPORT_DATE")" ]]; then
output_path="$(report_markdown_path "$REPORT_DATE")"
fi
track_report_state "$DATABASE_URL" "$REPORT_DATE" "failed" "" "" "$output_path" "$error_message" >/dev/null 2>&1 || true
track_report_state "$DATABASE_URL" "$REPORT_DATE" "failed" "" "$PIPELINE_AUDIT_SUMMARY" "$output_path" "$error_message" "manual" "pipeline" "false" >/dev/null 2>&1 || true
}
refresh_pipeline_audit
"$ROOT_DIR/scripts/apply_migration.sh"
if ! go run "./scripts/fetch_openrouter.go" \
-api-key "$OPENROUTER_API_KEY" \
-db "$DATABASE_URL" \
-out "$ROOT_DIR/models.json"; then
-out "$FETCH_OUT" \
-strict-real; then
merge_failed_source_keys "openrouter"
record_failure "真实采集失败"
exit 1
fi
if ! go run "./scripts/generate_daily_report.go"; then
FETCH_TOTAL=$(python3 - <<'PY' "$FETCH_OUT"
import json, sys
path = sys.argv[1]
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(int(data.get("total", 0)))
PY
)
if [[ "${FETCH_TOTAL:-0}" -lt 10 ]]; then
merge_failed_source_keys "openrouter"
record_failure "本次采集结果异常: total=${FETCH_TOTAL:-0} < 10"
exit 1
fi
refresh_pipeline_audit
MULTI_SOURCE_OUTPUT="$(mktemp)"
if ! go run "./scripts/fetch_multi_source.go" --sources moonshot,deepseek,openai > "$MULTI_SOURCE_OUTPUT"; then
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
if [[ -n "$MULTI_SOURCE_SUMMARY" ]]; then
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
else
MULTI_SOURCE_AUDIT="multi_source_audit=stage_failed"
merge_failed_source_keys "moonshot,deepseek,openai"
fi
cat "$MULTI_SOURCE_OUTPUT"
rm -f "$MULTI_SOURCE_OUTPUT"
record_failure "多源补充同步失败"
exit 1
fi
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY:-none}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
refresh_pipeline_audit
cat "$MULTI_SOURCE_OUTPUT"
rm -f "$MULTI_SOURCE_OUTPUT"
if ! go run -tags llm_script "./scripts/import_zhipu_data.go"; then
merge_failed_source_keys "zhipu"
record_failure "智谱官方导入失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/export_official_seed_json.go"; then
merge_failed_source_keys "official_seed_export"
record_failure "官方种子导出失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/import_phase2_data.go"; then
merge_failed_source_keys "baidu"
record_failure "百度官方导入失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/import_bytedance_data.go"; then
merge_failed_source_keys "bytedance"
record_failure "字节官方导入失败"
exit 1
fi
refresh_pipeline_audit
if ! REPORT_RUN_KIND="manual" REPORT_TRIGGER_SOURCE="pipeline" REPORT_IS_OFFICIAL_DAILY="false" REPORT_RUNTIME_AUDIT="$PIPELINE_AUDIT_SUMMARY" go run "./scripts/generate_daily_report.go"; then
record_failure "日报生成失败"
exit 1
fi

View File

@@ -19,6 +19,10 @@ check_executable "scripts/feishu_alert.sh" "飞书告警脚本可执行"
check_shell "日报生成器可独立构建" "go build -o /dev/null ./scripts/generate_daily_report.go"
check_shell "日报脚本包含降级逻辑" "grep -q 'fallback_report' scripts/run_daily.sh"
check_shell "日报脚本包含飞书告警逻辑" "grep -q 'send_alert' scripts/run_daily.sh"
check_shell "正式调度链启用严格真实采集" "grep -q -- '-strict-real' scripts/run_daily.sh && grep -q -- '-strict-real' scripts/run_real_pipeline.sh"
check_shell "正式调度链校验本次采集结果数量" "grep -q '本次采集结果异常' scripts/run_daily.sh && grep -q 'total=' scripts/run_real_pipeline.sh"
check_shell "每日流水线已纳入多源补充同步" "grep -q 'fetch_multi_source.go --sources moonshot,deepseek,openai' scripts/run_daily.sh && grep -q 'import_zhipu_data.go' scripts/run_daily.sh && grep -q 'import_phase2_data.go' scripts/run_daily.sh && grep -q 'import_bytedance_data.go' scripts/run_daily.sh"
check_shell "每日流水线会把来源级运行审计写入正式日报上下文" "grep -q 'REPORT_RUNTIME_AUDIT' scripts/run_daily.sh && grep -q 'selected_source_keys=' scripts/run_daily.sh && grep -q 'failed_source_keys=' scripts/run_daily.sh"
check_shell "今日日报 Markdown 主产物存在且包含数据质量摘要" "test -f ${TODAY_MARKDOWN_PATH} && grep -q '数据质量摘要' ${TODAY_MARKDOWN_PATH}"
check_shell "今日日报 HTML 主产物存在" "test -f ${TODAY_HTML_PATH}"
check_shell "今日日报归档副本存在Markdown + HTML" "test -f ${TODAY_ARCHIVE_MARKDOWN_PATH} && test -f ${TODAY_ARCHIVE_HTML_PATH}"

View File

@@ -16,6 +16,7 @@ check_executable "scripts/backup.sh" "数据库备份脚本可执行"
check_file "healthcheck.sh" "健康检查脚本存在"
check_file "scripts/restore.sh" "数据库恢复脚本存在"
check_shell "Makefile 暴露真实流水线与总门禁入口" "grep -q '^run-real-pipeline:' Makefile && grep -q '^verify-phase1:' Makefile && grep -q '^verify-phase6:' Makefile && grep -q '^verify-pre-phase6:' Makefile"
check_shell "真实流水线包含多源调度与来源级运行审计" "grep -Eq 'fetch_multi_source\\.go\"? --sources moonshot,deepseek,openai' scripts/run_real_pipeline.sh && grep -q 'REPORT_RUNTIME_AUDIT' scripts/run_real_pipeline.sh && grep -q 'failed_source_keys=' scripts/run_real_pipeline.sh"
check_shell "部署文档覆盖 Docker、前端启动与 cron 配置" "grep -q 'docker build' DEPLOYMENT.md && grep -q 'npm run dev' DEPLOYMENT.md && grep -q 'crontab -e' DEPLOYMENT.md"
check_shell "健康检查脚本覆盖数据库与日报可用性" "grep -q 'psql' healthcheck.sh && grep -q 'reports/daily/daily_report_' healthcheck.sh"
check_shell "备份恢复脚本具备 PostgreSQL 入口" "grep -Eq 'pg_dump|psql' scripts/backup.sh && grep -Eq 'psql|pg_restore' scripts/restore.sh"