Files
sub2api-cn-relay-manager/docs/2026-05-22-BATCH_AUTO_IMPORT_V2_MIGRATION_DRAFT.md
2026-05-22 14:15:41 +08:00

12 KiB
Raw Blame History

V2 数据库 Migration 草案 — Batch Auto-Import

日期2026-05-22
关联文档:

  • docs/2026-05-21-BATCH_AUTO_IMPORT_SPEC.md
  • docs/2026-05-21-BATCH_AUTO_IMPORT_TDD_PLAN.md
  • docs/2026-05-22-BATCH_AUTO_IMPORT_V2_ARCHITECTURE.md

1. 目标

这份草案定义 V2 在控制面 SQLite 上需要新增的持久化结构,以及从现有 v1 运行态表平滑接入的方式。

V2 的目标不是替换 v1 的执行链,而是新增一套面向长任务、异步确认、结果页的 canonical runtime store。

2. 设计原则

  1. 不破坏现有 v1 表

    • import_batches
    • import_batch_items
    • managed_resources
    • probe_results
    • access_closure_records
  2. V2 新增自己的 canonical state

    • import_runs
    • import_run_items
    • import_run_item_events
  3. 结果页/API 只读 V2 新表

  4. 允许 legacy link

    • item 可以记录 legacy_batch_id
    • item 可以记录 legacy_provider_id
    • 仅用于追溯,不用于投影
  5. 支持重复导入复用

    • 明确记录 api_key_fingerprint
    • 明确记录 provision_reused 与复用来源
    • 支撑“同 URL + 同模型家族”直接复用
  6. 按 SQLite 友好方式设计

    • 不使用复杂 JSON 索引
    • 关键筛选字段保留标量列
    • 复杂结构用 TEXT JSON 保存

3. 迁移命名建议

建议新增两条 migration

  1. 0007_batch_import_runs.sql
  2. 0008_batch_import_run_events.sql

原因:

  • 0007 先建立 run / item 主体
  • 0008 再建立 event trail 与二级索引

这样 rollback/debug 更简单,也便于先实现状态库,再补页面事件流。

4. 0007_batch_import_runs.sql

4.1 import_runs

CREATE TABLE import_runs (
    run_id TEXT PRIMARY KEY,
    mode TEXT NOT NULL,
    access_mode TEXT NOT NULL,
    state TEXT NOT NULL,
    total_items INTEGER NOT NULL DEFAULT 0,
    completed_items INTEGER NOT NULL DEFAULT 0,
    active_items INTEGER NOT NULL DEFAULT 0,
    degraded_items INTEGER NOT NULL DEFAULT 0,
    broken_items INTEGER NOT NULL DEFAULT 0,
    warning_items INTEGER NOT NULL DEFAULT 0,
    started_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
    finished_at TEXT NULL,
    CHECK (mode IN ('strict', 'partial')),
    CHECK (access_mode IN ('subscription', 'self_service')),
    CHECK (state IN ('running', 'completed', 'completed_with_warnings', 'failed', 'cancelled'))
);

索引:

CREATE INDEX idx_import_runs_started_at ON import_runs(started_at DESC);
CREATE INDEX idx_import_runs_state ON import_runs(state);
CREATE INDEX idx_import_runs_access_mode ON import_runs(access_mode);

4.2 import_run_items

CREATE TABLE import_run_items (
    item_id TEXT PRIMARY KEY,
    run_id TEXT NOT NULL,
    base_url TEXT NOT NULL,
    provider_id TEXT NOT NULL,
    api_key_fingerprint TEXT NOT NULL,
    requested_models_json TEXT NOT NULL DEFAULT '[]',
    raw_models_json TEXT NOT NULL DEFAULT '[]',
    normalized_models_json TEXT NOT NULL DEFAULT '[]',
    canonical_model_families_json TEXT NOT NULL DEFAULT '[]',
    recommended_models_json TEXT NOT NULL DEFAULT '[]',
    resolved_smoke_model TEXT NULL,
    capability_profile_json TEXT NOT NULL DEFAULT '{}',

    current_stage TEXT NOT NULL,
    confirmation_status TEXT NOT NULL,
    access_status TEXT NOT NULL,
    matched_account_state TEXT NOT NULL DEFAULT 'none',
    account_resolution TEXT NOT NULL DEFAULT 'created',
    provision_reused INTEGER NOT NULL DEFAULT 0,
    reused_from_provider_id TEXT NULL,
    reused_from_account_id INTEGER NULL,

    channel_id INTEGER NULL,
    account_id INTEGER NULL,

    retry_count INTEGER NOT NULL DEFAULT 0,
    confirmation_attempts INTEGER NOT NULL DEFAULT 0,
    last_retry_at TEXT NULL,
    next_retry_at TEXT NULL,

    lease_owner TEXT NULL,
    lease_until TEXT NULL,

    advisory_messages_json TEXT NOT NULL DEFAULT '[]',
    last_error_stage TEXT NULL,
    last_error TEXT NULL,

    legacy_batch_id INTEGER NULL,
    legacy_provider_id TEXT NULL,

    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (run_id) REFERENCES import_runs(run_id) ON DELETE CASCADE,

    CHECK (current_stage IN ('probe', 'provision', 'confirm', 'validate', 'done')),
    CHECK (confirmation_status IN ('pending', 'confirmed', 'advisory', 'failed')),
    CHECK (access_status IN ('unknown', 'active', 'degraded', 'broken'))
);

索引:

CREATE INDEX idx_import_run_items_run_id ON import_run_items(run_id);
CREATE INDEX idx_import_run_items_provider_id ON import_run_items(provider_id);
CREATE INDEX idx_import_run_items_key_fingerprint ON import_run_items(api_key_fingerprint);
CREATE INDEX idx_import_run_items_current_stage ON import_run_items(current_stage);
CREATE INDEX idx_import_run_items_confirmation_status ON import_run_items(confirmation_status);
CREATE INDEX idx_import_run_items_access_status ON import_run_items(access_status);
CREATE INDEX idx_import_run_items_next_retry_at ON import_run_items(next_retry_at);
CREATE INDEX idx_import_run_items_lease_until ON import_run_items(lease_until);

4.3 为什么这些列是标量

下列字段必须保留标量列,不能只藏在 JSON 里:

  • provider_id
  • api_key_fingerprint
  • current_stage
  • confirmation_status
  • access_status
  • matched_account_state
  • account_resolution
  • next_retry_at
  • lease_until

原因:

  • worker 要按这些字段轮询
  • 结果页列表要按这些字段筛选
  • SQLite 下从 JSON 中筛选成本高、代码复杂度高

4.4 重复导入复用预检查

V2 需要显式支持:

  • 已成功导入的 provider 再次添加时自动复用
  • 同模型不同别名只 patch mapping

因此 migration 必须支撑以下预检查顺序:

  1. host_id + provider_id
  2. host_id + base_url + api_key_fingerprint
  3. canonical_model_families_json 与现有 provider 覆盖关系比较

结果分三类:

  • reused
    • provision_reused=1
    • 写入 reused_from_provider_id
    • 视情况写入 reused_from_account_id
  • patch_only
    • 不重建 provider/account
    • 仅更新 alias/model_mapping/model_pricing
  • replace
    • 原 provider broken 或 key 失效
    • 重新 provision

同时对命中的既有账号,还要额外落两类语义:

  • matched_account_state
    • active | disabled | deprecated | broken
  • account_resolution
    • created | reused | reactivated | replaced

这样结果页才能直接显示:

  • “重复,已启用”
  • “已弃用,已快速启用”
  • “已损坏,已替换”

5. 0008_batch_import_run_events.sql

5.1 import_run_item_events

CREATE TABLE import_run_item_events (
    event_id TEXT PRIMARY KEY,
    run_id TEXT NOT NULL,
    item_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    stage TEXT NOT NULL,
    attempt INTEGER NOT NULL DEFAULT 0,
    message TEXT NOT NULL,
    payload_json TEXT NOT NULL DEFAULT '{}',
    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (run_id) REFERENCES import_runs(run_id) ON DELETE CASCADE,
    FOREIGN KEY (item_id) REFERENCES import_run_items(item_id) ON DELETE CASCADE
);

索引:

CREATE INDEX idx_import_run_item_events_run_id ON import_run_item_events(run_id);
CREATE INDEX idx_import_run_item_events_item_id ON import_run_item_events(item_id);
CREATE INDEX idx_import_run_item_events_created_at ON import_run_item_events(created_at);
CREATE INDEX idx_import_run_item_events_stage ON import_run_item_events(stage);
CREATE INDEX idx_import_run_item_events_type ON import_run_item_events(event_type);

5.2 事件类型建议

建议第一版固定这些值:

  • stage_transition
  • probe_result
  • provision_result
  • retry_scheduled
  • retry_started
  • advisory_added
  • confirmation_result
  • validation_result

不要在第一版做开放式自由文本事件类型,以免前后端难以统一。

6. 回填与迁移策略

6.1 不做历史全量回填

V2 第一版不要求把所有旧 import_batches 回填成 import_runs

原因:

  • 历史数据缺少 retry trail
  • 缺少 capability profile
  • 缺少 confirmation 语义
  • 强行回填会制造伪精度

6.2 从启用 V2 后开始写新表

策略:

  1. v1 老入口继续写旧表
  2. v2 新入口只写新表,并在 item 上记录:
    • legacy_batch_id
    • legacy_provider_id
  3. 页面/API 只展示 V2 产生的新 run

6.3 兼容旧逻辑

如果 V2 内部仍调用现有 provision/import 逻辑:

  • 允许继续产生 import_batches
  • 但 V2 handler 必须在自己的 import_run_items 中写投影

也就是说:

  • 旧表是“执行副产物”
  • 新表是“V2 真相源”

7. Repository 草案

建议新增 repo

internal/store/sqlite/
  import_runs_repo.go
  import_run_items_repo.go
  import_run_item_events_repo.go

7.1 ImportRunsRepo

核心方法:

  • Create(ctx, run ImportRun) error
  • Update(ctx, run ImportRun) error
  • Get(ctx, runID string) (ImportRun, error)
  • List(ctx, filter ListImportRunsFilter) ([]ImportRun, error)

7.2 ImportRunItemsRepo

核心方法:

  • Create(ctx, item ImportRunItem) error
  • Update(ctx, item ImportRunItem) error
  • Get(ctx, itemID string) (ImportRunItem, error)
  • ListByRun(ctx, runID string, filter ListImportRunItemsFilter) ([]ImportRunItem, error)
  • AcquireConfirmLease(ctx, now time.Time, workerID string, leaseFor time.Duration, limit int) ([]ImportRunItem, error)

7.3 ImportRunItemEventsRepo

核心方法:

  • Append(ctx, event ImportRunItemEvent) error
  • ListByItem(ctx, itemID string) ([]ImportRunItemEvent, error)

8. Worker 查询草案

Confirmation worker 轮询建议:

SELECT *
FROM import_run_items
WHERE current_stage = 'confirm'
  AND confirmation_status = 'pending'
  AND (next_retry_at IS NULL OR next_retry_at <= ?)
  AND (lease_until IS NULL OR lease_until < ?)
ORDER BY
  CASE WHEN next_retry_at IS NULL THEN 0 ELSE 1 END,
  next_retry_at ASC,
  updated_at ASC
LIMIT ?;

租约写入建议使用“条件更新”方式,避免并发重复领取。

9. 聚合策略

import_runs 的统计字段不建议每次列表查询时现算,建议:

  • item 更新后同步回写 run summary
  • 或在同一事务里重算该 run 的计数

原因:

  • run 列表页是高频读
  • SQLite 上临时聚合 JSON 和事件表不划算

建议聚合规则:

  • completed_itemscurrent_stage='done'
  • active_itemsaccess_status='active'
  • degraded_itemsaccess_status='degraded'
  • broken_itemsaccess_status='broken'
  • warning_itemsconfirmation_status='advisory' OR access_status='degraded'

10. 非功能约束

  1. 幂等

    • run_iditem_id 由控制面生成,不能依赖数据库自增主键做外部 API 标识
    • api_key_fingerprint + provider_id + canonical_model_families 是重复导入复用的关键判定输入
  2. 可恢复

    • 任何阶段切换前后都要先写 item
    • 任何 retry 调度都要落 event
  3. 可审计

    • 详情页必须能从 event trail 解释 warning/broken
  4. 可删

    • 删 run 时item 和 event 级联删除

11. 实施顺序建议

  1. 先上 0007_batch_import_runs.sql
  2. 实现 ImportRunsRepo / ImportRunItemsRepo
  3. 让 batch service 在 Stage 0~2 先写新表
  4. 再上 0008_batch_import_run_events.sql
  5. 引入 confirmation worker 和 event trail
  6. 最后再接结果页/API

12. 验证点

文档级实现前检查:

  • 新表是否足以支撑结果页所有字段
  • 是否存在必须从 legacy 表实时拼接才能看到的字段
  • worker 是否能只靠新表完成 confirm/resume

实现后建议最小验证:

  1. create run → items 可写入
  2. item 进入 confirm → next_retry_at/lease 可更新
  3. event trail 可回放
  4. 控制面重启后 unfinished item 可重新被 worker 捞起