303 lines
12 KiB
Go
303 lines
12 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"testing"
|
|
|
|
"sub2api-cn-relay-manager/internal/batch"
|
|
"sub2api-cn-relay-manager/internal/probe"
|
|
)
|
|
|
|
func TestBatchRunsHTTP(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("GET runs returns projected summaries", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
handler := NewAPIHandler("secret-token", ActionSet{
|
|
ListBatchImportRuns: func(_ context.Context, req ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) {
|
|
if req.State != "completed_with_warnings" {
|
|
t.Fatalf("State = %q, want completed_with_warnings", req.State)
|
|
}
|
|
if req.AccessMode != "subscription" {
|
|
t.Fatalf("AccessMode = %q, want subscription", req.AccessMode)
|
|
}
|
|
if req.Query != "kimi" {
|
|
t.Fatalf("Query = %q, want kimi", req.Query)
|
|
}
|
|
if req.Cursor != "cursor-1" {
|
|
t.Fatalf("Cursor = %q, want cursor-1", req.Cursor)
|
|
}
|
|
nextCursor := "cursor-2"
|
|
return ListBatchImportRunsResponse{Runs: []batch.RunSummaryProjection{{
|
|
RunID: "run-1",
|
|
State: "completed_with_warnings",
|
|
Mode: "partial",
|
|
AccessMode: "subscription",
|
|
TotalItems: 2,
|
|
WarningItems: 1,
|
|
}}, NextCursor: &nextCursor}, nil
|
|
},
|
|
})
|
|
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs?state=completed_with_warnings&access_mode=subscription&q=kimi&cursor=cursor-1", nil, "secret-token")
|
|
res := httptestRecorder(handler, req)
|
|
assertStatusCode(t, res, http.StatusOK)
|
|
run := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "runs", 0)
|
|
assertJSONObjectValue(t, run, "run_id", "run-1")
|
|
assertJSONObjectValue(t, run, "state", "completed_with_warnings")
|
|
assertJSONObjectValue(t, run, "access_mode", "subscription")
|
|
assertJSONObjectValue(t, run, "warning_items", float64(1))
|
|
assertJSONContains(t, res.Body().Bytes(), "next_cursor", "cursor-2")
|
|
})
|
|
|
|
t.Run("GET run detail returns wrapped projection", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
handler := NewAPIHandler("secret-token", ActionSet{
|
|
GetBatchImportRun: func(_ context.Context, runID string) (batch.RunSummaryProjection, error) {
|
|
if runID != "run-1" {
|
|
t.Fatalf("runID = %q, want run-1", runID)
|
|
}
|
|
return batch.RunSummaryProjection{
|
|
RunID: "run-1",
|
|
State: "completed_with_warnings",
|
|
Mode: "partial",
|
|
AccessMode: "subscription",
|
|
TotalItems: 2,
|
|
CompletedItems: 2,
|
|
ActiveItems: 1,
|
|
DegradedItems: 1,
|
|
WarningItems: 1,
|
|
RecentWarnings: []string{"warning"},
|
|
}, nil
|
|
},
|
|
})
|
|
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1", nil, "secret-token")
|
|
res := httptestRecorder(handler, req)
|
|
assertStatusCode(t, res, http.StatusOK)
|
|
assertJSONContains(t, res.Body().Bytes(), "run.run_id", "run-1")
|
|
assertJSONArrayValueAt(t, res.Body().Bytes(), "recent_warnings", 0, "warning")
|
|
})
|
|
|
|
t.Run("GET items forwards matched account filters", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
handler := NewAPIHandler("secret-token", ActionSet{
|
|
ListBatchImportRunItems: func(_ context.Context, req ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error) {
|
|
if req.RunID != "run-1" {
|
|
t.Fatalf("RunID = %q, want run-1", req.RunID)
|
|
}
|
|
if req.MatchedAccountState != "active" {
|
|
t.Fatalf("MatchedAccountState = %q, want active", req.MatchedAccountState)
|
|
}
|
|
if req.AccountResolution != "reused" {
|
|
t.Fatalf("AccountResolution = %q, want reused", req.AccountResolution)
|
|
}
|
|
if req.Cursor != "item-cursor-1" {
|
|
t.Fatalf("Cursor = %q, want item-cursor-1", req.Cursor)
|
|
}
|
|
nextCursor := "item-cursor-2"
|
|
return ListBatchImportRunItemsResponse{Items: []batch.ItemSummaryProjection{{
|
|
ItemID: "item-1",
|
|
BaseURL: "https://kimi.example.com/v1",
|
|
ProviderID: "kimi-a7m-1",
|
|
APIKeyFingerprint: "sha256:1234",
|
|
CurrentStage: "done",
|
|
ConfirmationStatus: "advisory",
|
|
AccessStatus: "active",
|
|
MatchedAccountState: "active",
|
|
AccountResolution: "reused",
|
|
ProvisionReused: true,
|
|
}}, NextCursor: &nextCursor}, nil
|
|
},
|
|
})
|
|
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items?matched_account_state=active&account_resolution=reused&cursor=item-cursor-1", nil, "secret-token")
|
|
res := httptestRecorder(handler, req)
|
|
assertStatusCode(t, res, http.StatusOK)
|
|
item := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "items", 0)
|
|
assertJSONObjectValue(t, item, "item_id", "item-1")
|
|
assertJSONObjectValue(t, item, "matched_account_state", "active")
|
|
assertJSONObjectValue(t, item, "account_resolution", "reused")
|
|
assertJSONObjectValue(t, item, "provision_reused", true)
|
|
assertJSONContains(t, res.Body().Bytes(), "next_cursor", "item-cursor-2")
|
|
})
|
|
|
|
t.Run("GET item detail returns capability profile and events", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
handler := NewAPIHandler("secret-token", ActionSet{
|
|
GetBatchImportRunItem: func(_ context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
|
|
if req.RunID != "run-1" || req.ItemID != "item-1" {
|
|
t.Fatalf("request = %#v, want run-1/item-1", req)
|
|
}
|
|
return batch.ItemDetailProjection{
|
|
ItemSummaryProjection: batch.ItemSummaryProjection{
|
|
ItemID: "item-1",
|
|
BaseURL: "https://kimi.example.com/v1",
|
|
ProviderID: "kimi-a7m-1",
|
|
APIKeyFingerprint: "sha256:1234",
|
|
CanonicalModelFamilies: []string{"kimi-2.6"},
|
|
CurrentStage: "done",
|
|
ConfirmationStatus: "advisory",
|
|
AccessStatus: "active",
|
|
MatchedAccountState: "deprecated",
|
|
AccountResolution: "reactivated",
|
|
ProvisionReused: true,
|
|
},
|
|
ReusedFromProviderID: "provider-kimi-old",
|
|
CapabilityProfile: batchProbeProfile(),
|
|
Events: []batch.EventProjection{{
|
|
EventID: "evt-1",
|
|
EventType: "retry_scheduled",
|
|
Stage: "confirm",
|
|
Message: "retry queued",
|
|
}},
|
|
}, nil
|
|
},
|
|
})
|
|
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items/item-1", nil, "secret-token")
|
|
res := httptestRecorder(handler, req)
|
|
assertStatusCode(t, res, http.StatusOK)
|
|
assertJSONContains(t, res.Body().Bytes(), "item_id", "item-1")
|
|
assertJSONContains(t, res.Body().Bytes(), "reused_from_provider_id", "provider-kimi-old")
|
|
assertJSONContains(t, res.Body().Bytes(), "capability_profile.transport_profile.supports_openai_chat_completions", true)
|
|
event := decodeJSONArrayObjectAt(t, res.Body().Bytes(), "events", 0)
|
|
assertJSONObjectValue(t, event, "event_type", "retry_scheduled")
|
|
assertJSONObjectValue(t, event, "stage", "confirm")
|
|
})
|
|
}
|
|
|
|
func TestBatchRunWrapperFunctions(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("handleGetBatchImportRun validates inputs", func(t *testing.T) {
|
|
t.Parallel()
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1", nil, "")
|
|
rec := &responseRecorder{header: map[string][]string{}}
|
|
handleGetBatchImportRun(rec, req, nil)
|
|
assertStatusCode(t, rec, http.StatusInternalServerError)
|
|
|
|
req = httptestRequest(t, http.MethodGet, "/api/batch-import/runs/", nil, "")
|
|
rec = &responseRecorder{header: map[string][]string{}}
|
|
handleGetBatchImportRun(rec, req, func(context.Context, string) (batch.RunSummaryProjection, error) {
|
|
return batch.RunSummaryProjection{}, nil
|
|
})
|
|
assertStatusCode(t, rec, http.StatusBadRequest)
|
|
assertJSONContains(t, rec.Body().Bytes(), "error.message", "run_id is required")
|
|
})
|
|
|
|
t.Run("handleListBatchImportRunItems validates run id and empty result", func(t *testing.T) {
|
|
t.Parallel()
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items?has_warning=true", nil, "")
|
|
rec := &responseRecorder{header: map[string][]string{}}
|
|
handleListBatchImportRunItems(rec, req, nil)
|
|
assertStatusCode(t, rec, http.StatusInternalServerError)
|
|
|
|
req = httptestRequest(t, http.MethodGet, "/api/batch-import/runs//items?has_warning=true", nil, "")
|
|
rec = &responseRecorder{header: map[string][]string{}}
|
|
handleListBatchImportRunItems(rec, req, func(context.Context, ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error) {
|
|
return ListBatchImportRunItemsResponse{}, nil
|
|
})
|
|
assertStatusCode(t, rec, http.StatusBadRequest)
|
|
|
|
req = httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items?has_warning=true&limit=3", nil, "")
|
|
req.SetPathValue("run_id", "run-1")
|
|
rec = &responseRecorder{header: map[string][]string{}}
|
|
handleListBatchImportRunItems(rec, req, func(_ context.Context, got ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error) {
|
|
if got.RunID != "run-1" || got.Limit != 3 || got.HasWarning == nil || !*got.HasWarning {
|
|
t.Fatalf("ListBatchImportRunItemsRequest = %+v, want parsed filters", got)
|
|
}
|
|
return ListBatchImportRunItemsResponse{}, nil
|
|
})
|
|
assertStatusCode(t, rec, http.StatusOK)
|
|
items, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "items")
|
|
if !ok || len(items) != 0 {
|
|
t.Fatalf("items = %#v, want empty array", items)
|
|
}
|
|
})
|
|
|
|
t.Run("handleGetBatchImportRunItem validates ids", func(t *testing.T) {
|
|
t.Parallel()
|
|
req := httptestRequest(t, http.MethodGet, "/api/batch-import/runs/run-1/items/item-1", nil, "")
|
|
rec := &responseRecorder{header: map[string][]string{}}
|
|
handleGetBatchImportRunItem(rec, req, nil)
|
|
assertStatusCode(t, rec, http.StatusInternalServerError)
|
|
|
|
req = httptestRequest(t, http.MethodGet, "/api/batch-import/runs//items/", nil, "")
|
|
rec = &responseRecorder{header: map[string][]string{}}
|
|
handleGetBatchImportRunItem(rec, req, func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
|
|
return batch.ItemDetailProjection{}, nil
|
|
})
|
|
assertStatusCode(t, rec, http.StatusBadRequest)
|
|
assertJSONContains(t, rec.Body().Bytes(), "error.message", "run_id and item_id are required")
|
|
})
|
|
}
|
|
|
|
func batchProbeProfile() probe.CapabilityProfile {
|
|
return probe.CapabilityProfile{
|
|
TransportProfile: probe.TransportProfile{
|
|
SupportsOpenAIChatCompletions: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
func decodeJSONArrayObjectAt(t *testing.T, payload []byte, key string, index int) map[string]any {
|
|
t.Helper()
|
|
|
|
values, ok := decodeTopLevelArray(t, payload, key)
|
|
if !ok {
|
|
t.Fatalf("json key %q is not an array; payload=%s", key, string(payload))
|
|
}
|
|
if index < 0 || index >= len(values) {
|
|
t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload))
|
|
}
|
|
|
|
object, ok := values[index].(map[string]any)
|
|
if !ok {
|
|
t.Fatalf("json key %q[%d] is not an object; payload=%s", key, index, string(payload))
|
|
}
|
|
return object
|
|
}
|
|
|
|
func assertJSONArrayValueAt(t *testing.T, payload []byte, key string, index int, want any) {
|
|
t.Helper()
|
|
|
|
values, ok := decodeTopLevelArray(t, payload, key)
|
|
if !ok {
|
|
t.Fatalf("json key %q is not an array; payload=%s", key, string(payload))
|
|
}
|
|
if index < 0 || index >= len(values) {
|
|
t.Fatalf("json key %q length = %d, want index %d present; payload=%s", key, len(values), index, string(payload))
|
|
}
|
|
|
|
if got := values[index]; got != want {
|
|
t.Fatalf("json key %q[%d] = %#v, want %#v; payload=%s", key, index, got, want, string(payload))
|
|
}
|
|
}
|
|
|
|
func assertJSONObjectValue(t *testing.T, object map[string]any, key string, want any) {
|
|
t.Helper()
|
|
|
|
if got := object[key]; got != want {
|
|
t.Fatalf("json object key %q = %#v, want %#v", key, got, want)
|
|
}
|
|
}
|
|
|
|
func decodeTopLevelArray(t *testing.T, payload []byte, key string) ([]any, bool) {
|
|
t.Helper()
|
|
|
|
var decoded map[string]any
|
|
if err := json.Unmarshal(payload, &decoded); err != nil {
|
|
t.Fatalf("json.Unmarshal() error = %v; payload=%s", err, string(payload))
|
|
}
|
|
|
|
values, ok := decoded[key].([]any)
|
|
return values, ok
|
|
}
|