Files
sub2api-cn-relay-manager/tests/integration/batch_import_v2_test.go
2026-05-22 15:51:21 +08:00

396 lines
14 KiB
Go

package integration_test
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/host/sub2api"
"sub2api-cn-relay-manager/internal/probe"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func TestBatchImportV2(t *testing.T) {
t.Parallel()
ctx := context.Background()
harness := newBatchImportV2Harness(t)
defer harness.Close(t)
provisioner := &batchImportProvisionerStub{}
runID, err := harness.RunBatchImport(ctx, provisioner)
if err != nil {
t.Fatalf("RunBatchImport() error = %v", err)
}
if runID == "" {
t.Fatal("runID = empty, want persisted run")
}
if provisioner.provisionCalls != 3 {
t.Fatalf("provision calls = %d, want 3 for new/advisory/retry items", provisioner.provisionCalls)
}
if provisioner.patchCalls != 1 {
t.Fatalf("patch calls = %d, want 1 for alias-only reuse flow", provisioner.patchCalls)
}
if got := provisioner.lastPatch.Contract.ModelMapping["Kimi-K2.6"]; got != "kimi-2.6" {
t.Fatalf("patch mapping = %#v, want raw alias mapped to canonical family", provisioner.lastPatch.Contract.ModelMapping)
}
run, err := harness.store.ImportRuns().GetByRunID(ctx, runID)
if err != nil {
t.Fatalf("ImportRuns().GetByRunID() error = %v", err)
}
runView := batch.ProjectRunSummary(run)
if runView.State != string(batch.RunStateCompleted) {
t.Fatalf("run state = %q, want completed", runView.State)
}
if runView.CompletedItems != 6 || runView.ActiveItems != 6 || runView.TotalItems != 6 {
t.Fatalf("run view = %+v, want all 6 items completed and active", runView)
}
items, err := harness.store.ImportRunItems().ListByRunID(ctx, runID)
if err != nil {
t.Fatalf("ImportRunItems().ListByRunID() error = %v", err)
}
if len(items) != 6 {
t.Fatalf("len(items) = %d, want 6", len(items))
}
newItem := findItemByBaseURL(t, items, harness.baseURL+"/new")
if got := batch.ProjectItemSummary(newItem).CanonicalModelFamilies; len(got) != 1 || got[0] != "deepseek-v4-pro" {
t.Fatalf("new item canonical families = %#v, want [deepseek-v4-pro]", got)
}
activeItem := findItemByBaseURL(t, items, harness.baseURL+"/active")
activeView := batch.ProjectItemSummary(activeItem)
if activeView.MatchedAccountState != string(batch.MatchedAccountStateActive) || activeView.AccountResolution != string(batch.AccountResolutionReused) || !activeView.ProvisionReused {
t.Fatalf("active duplicate projection = %+v, want active/reused/provision_reused", activeView)
}
deprecatedItem := findItemByBaseURL(t, items, harness.baseURL+"/deprecated")
deprecatedView := batch.ProjectItemSummary(deprecatedItem)
if deprecatedView.MatchedAccountState != string(batch.MatchedAccountStateDeprecated) || deprecatedView.AccountResolution != string(batch.AccountResolutionReactivated) || !deprecatedView.ProvisionReused {
t.Fatalf("deprecated duplicate projection = %+v, want deprecated/reactivated/provision_reused", deprecatedView)
}
advisoryItem := findItemByBaseURL(t, items, harness.baseURL+"/advisory")
advisoryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, advisoryItem.ItemID)
if err != nil {
t.Fatalf("ImportRunEvents().ListByItemID(advisory) error = %v", err)
}
advisoryDetail, err := batch.ProjectItemDetail(advisoryItem, advisoryEvents)
if err != nil {
t.Fatalf("ProjectItemDetail(advisory) error = %v", err)
}
if advisoryDetail.ConfirmationStatus != string(batch.ConfirmationAdvisory) || advisoryDetail.AccessStatus != string(batch.AccessStatusActive) {
t.Fatalf("advisory detail = %+v, want advisory confirmation and active access", advisoryDetail)
}
if !containsString(advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories, "initial_probe_race_expected") {
t.Fatalf("advisory capability profile = %+v, want initial_probe_race_expected", advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories)
}
if !containsSubstring(advisoryDetail.AdvisoryMessages, "异步探测尚未稳定") {
t.Fatalf("advisory messages = %#v, want mapped probe race advisory", advisoryDetail.AdvisoryMessages)
}
if !containsEventType(advisoryDetail.Events, "advisory_added") {
t.Fatalf("advisory events = %+v, want advisory_added event", advisoryDetail.Events)
}
retryItem := findItemByBaseURL(t, items, harness.baseURL+"/retry")
retryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, retryItem.ItemID)
if err != nil {
t.Fatalf("ImportRunEvents().ListByItemID(retry) error = %v", err)
}
retryDetail, err := batch.ProjectItemDetail(retryItem, retryEvents)
if err != nil {
t.Fatalf("ProjectItemDetail(retry) error = %v", err)
}
if retryDetail.RetryCount != 1 || retryDetail.AccessStatus != string(batch.AccessStatusActive) {
t.Fatalf("retry detail = %+v, want retry_count=1 and active access", retryDetail)
}
if !containsEventType(retryDetail.Events, "retry_scheduled") || !containsEventType(retryDetail.Events, "stage_transition") {
t.Fatalf("retry events = %+v, want retry_scheduled and stage_transition", retryDetail.Events)
}
}
type batchImportV2Harness struct {
store *sqlite.DB
server *httptest.Server
baseURL string
}
func newBatchImportV2Harness(t *testing.T) *batchImportV2Harness {
t.Helper()
store := openTestStore(t)
server := httptest.NewServer(newBatchImportUpstreamMux())
return &batchImportV2Harness{
store: store,
server: server,
baseURL: server.URL,
}
}
func (h *batchImportV2Harness) Close(t *testing.T) {
t.Helper()
h.server.Close()
closeTestStore(t, h.store)
}
func (h *batchImportV2Harness) RunBatchImport(ctx context.Context, provisioner *batchImportProvisionerStub) (string, error) {
service := batch.BatchImportService{
RunStore: h.store.ImportRuns(),
ItemStore: h.store.ImportRunItems(),
ProbeModels: probe.ProviderModels,
ProbeCapabilities: probe.ProbeCapabilities,
InspectReuse: h.inspectReuse,
Provisioner: provisioner,
}
result, err := service.StartRun(ctx, batch.BatchImportRunRequest{
RunID: "run-v2-int-001",
HostID: "host-int-1",
Mode: "strict",
AccessMode: "self_service",
Entries: []batch.BatchImportEntry{
{BaseURL: h.baseURL + "/new", APIKey: "sk-new", RequestedModels: []string{"DeepSeek V4 Pro"}},
{BaseURL: h.baseURL + "/active", APIKey: "sk-active", RequestedModels: []string{"kimi 2.6"}},
{BaseURL: h.baseURL + "/deprecated", APIKey: "sk-deprecated", RequestedModels: []string{"kimi 2.6"}},
{BaseURL: h.baseURL + "/patch", APIKey: "sk-patch", RequestedModels: []string{"kimi 2.6"}},
{BaseURL: h.baseURL + "/advisory", APIKey: "sk-advisory", RequestedModels: []string{"kimi-k2.6"}},
{BaseURL: h.baseURL + "/retry", APIKey: "sk-retry", RequestedModels: []string{"kimi-k2.6"}},
},
})
if err != nil {
return "", err
}
worker := batch.ConfirmationWorker{
WorkerID: "worker-int-1",
ItemStore: batchImportConfirmationStore{store: h.store, runID: result.RunID},
EventStore: h.store.ImportRunEvents(),
LeaseDuration: time.Minute,
RetryDelay: time.Second,
Confirmer: h.confirm,
}
now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC)
if err := worker.Tick(ctx, now); err != nil {
return "", err
}
if err := worker.Tick(ctx, now.Add(2*time.Second)); err != nil {
return "", err
}
items, err := h.store.ImportRunItems().ListByRunID(ctx, result.RunID)
if err != nil {
return "", err
}
validator := batch.ValidationService{
ItemStore: h.store.ImportRunItems(),
RunStore: h.store.ImportRuns(),
Validator: h.validate,
}
for _, item := range items {
if item.CurrentStage != string(batch.ItemStageValidate) {
continue
}
if err := validator.ValidateItem(ctx, item); err != nil {
return "", err
}
}
return result.RunID, nil
}
func (h *batchImportV2Harness) inspectReuse(_ context.Context, input batch.ReuseLookupInput) (batch.ReuseLookupResult, error) {
switch {
case strings.HasSuffix(input.BaseURL, "/active"):
return batch.ReuseLookupResult{
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
ExistingAccessStatus: batch.AccessStatusActive,
ExistingCanonicalFamilys: []string{"kimi 2.6"},
MatchedAccountID: 201,
MatchedAccountState: batch.MatchedAccountStateActive,
}, nil
case strings.HasSuffix(input.BaseURL, "/deprecated"):
return batch.ReuseLookupResult{
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
ExistingAccessStatus: batch.AccessStatusActive,
ExistingCanonicalFamilys: []string{"kimi 2.6"},
MatchedAccountID: 301,
MatchedAccountState: batch.MatchedAccountStateDeprecated,
}, nil
case strings.HasSuffix(input.BaseURL, "/patch"):
return batch.ReuseLookupResult{
ExistingProviderID: batch.NormalizeProviderID(input.BaseURL),
ExistingAccessStatus: batch.AccessStatusActive,
ExistingCanonicalFamilys: []string{"kimi 2.6"},
MatchedAccountID: 401,
MatchedAccountState: batch.MatchedAccountStateActive,
ExistingModelMapping: map[string]string{"kimi-k2.6": "kimi-2.6"},
}, nil
default:
return batch.ReuseLookupResult{}, nil
}
}
func (h *batchImportV2Harness) confirm(_ context.Context, item sqlite.ImportRunItem) (batch.ConfirmationResult, error) {
switch {
case strings.HasSuffix(item.BaseURL, "/advisory"):
return batch.ConfirmationResult{StatusCode: http.StatusForbidden, Message: "probe race expected"}, nil
case strings.HasSuffix(item.BaseURL, "/retry") && item.ConfirmationAttempts == 0:
return batch.ConfirmationResult{StatusCode: http.StatusServiceUnavailable, Message: "no available accounts"}, nil
default:
return batch.ConfirmationResult{StatusCode: http.StatusOK, Message: "confirmation succeeded"}, nil
}
}
func (h *batchImportV2Harness) validate(_ context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) {
return sub2api.GatewayCompletionResult{
OK: true,
StatusCode: http.StatusOK,
ContentType: "application/json",
BodyPreview: fmt.Sprintf(`{"item_id":%q,"status":"ok"}`, item.ItemID),
}, nil
}
type batchImportProvisionerStub struct {
provisionCalls int
patchCalls int
lastPatch batch.PatchProvisionRequest
}
func (p *batchImportProvisionerStub) Provision(_ context.Context, req batch.ProvisionRequest) (batch.ProvisionResult, error) {
p.provisionCalls++
legacyBatchID := int64(800 + p.provisionCalls)
return batch.ProvisionResult{
LegacyBatchID: &legacyBatchID,
LegacyProviderID: req.ProviderID,
}, nil
}
func (p *batchImportProvisionerStub) Patch(_ context.Context, req batch.PatchProvisionRequest) error {
p.patchCalls++
p.lastPatch = req
return nil
}
type batchImportConfirmationStore struct {
store *sqlite.DB
runID string
}
func (s batchImportConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) {
return s.store.ImportRunItems().ListByRunID(ctx, s.runID)
}
func (s batchImportConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error {
return s.store.ImportRunItems().Upsert(ctx, item)
}
func newBatchImportUpstreamMux() http.Handler {
modelsByToken := map[string][]string{
"sk-new": {"deepseek-ai/DeepSeek-V4-Pro"},
"sk-active": {"kimi-k2.6"},
"sk-deprecated": {"kimi-k2.6"},
"sk-patch": {"Kimi-K2.6"},
"sk-advisory": {"kimi-k2.6"},
"sk-retry": {"kimi-k2.6"},
}
mux := http.NewServeMux()
mux.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) {
token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer "))
models, ok := modelsByToken[token]
if !ok {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":"unauthorized"}`))
return
}
data := make([]map[string]any, 0, len(models))
for _, model := range models {
data = append(data, map[string]any{"id": model})
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(mustJSON(data)))
})
mux.HandleFunc("/v1/responses", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"error":"responses unsupported"}`))
})
mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {
token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer "))
if _, ok := modelsByToken[token]; !ok {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte(`{"error":"unauthorized"}`))
return
}
if token == "sk-advisory" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"error":"probe race expected"}`))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"chatcmpl_batch_import_v2","choices":[{"index":0,"message":{"role":"assistant","content":"pong"}}]}`))
})
return mux
}
func findItemByBaseURL(t *testing.T, items []sqlite.ImportRunItem, baseURL string) sqlite.ImportRunItem {
t.Helper()
for _, item := range items {
if item.BaseURL == baseURL {
return item
}
}
t.Fatalf("item with base_url %q not found in %#v", baseURL, items)
return sqlite.ImportRunItem{}
}
func containsString(values []string, want string) bool {
for _, value := range values {
if value == want {
return true
}
}
return false
}
func containsSubstring(values []string, fragment string) bool {
for _, value := range values {
if strings.Contains(value, fragment) {
return true
}
}
return false
}
func containsEventType(events []batch.EventProjection, want string) bool {
for _, event := range events {
if event.EventType == want {
return true
}
}
return false
}
func mustJSON(data []map[string]any) string {
values := make([]string, 0, len(data))
for _, item := range data {
values = append(values, fmt.Sprintf(`{"id":%q}`, item["id"]))
}
return `{"data":[` + strings.Join(values, ",") + `]}`
}