220 lines
6.3 KiB
Go
220 lines
6.3 KiB
Go
package routing
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"sub2api-cn-relay-manager/internal/store/sqlite"
|
|
)
|
|
|
|
func TestSQLiteLogWriterAppendAndFlush(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
dbPath := filepath.Join(t.TempDir(), "route-logs.db")
|
|
dsn := "file:" + filepath.ToSlash(dbPath) + "?_busy_timeout=5000"
|
|
writer, err := NewSQLiteLogWriter(context.Background(), dsn, AsyncLogWriterOptions{
|
|
QueueSize: 4,
|
|
FlushInterval: time.Hour,
|
|
MaxBatchSize: 8,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("NewSQLiteLogWriter() error = %v", err)
|
|
}
|
|
defer writer.Close()
|
|
|
|
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
|
RequestID: "req-1",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
StickyKey: "sticky-1",
|
|
StickyKeyType: "conversation",
|
|
StickyHit: true,
|
|
SelectedRouteID: "asxs",
|
|
SelectedShadowGroupID: "gpt-shared__asxs",
|
|
UpstreamStatus: 200,
|
|
LatencyMS: 120,
|
|
}); err != nil {
|
|
t.Fatalf("AppendDecision() error = %v", err)
|
|
}
|
|
if err := writer.AppendFailover(context.Background(), RouteFailoverEvent{
|
|
RequestID: "req-2",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
FromRouteID: "asxs",
|
|
ToRouteID: "codex2api",
|
|
Reason: "timeout",
|
|
FailureCount: 2,
|
|
}); err != nil {
|
|
t.Fatalf("AppendFailover() error = %v", err)
|
|
}
|
|
if err := writer.AppendStickyAudit(context.Background(), RouteStickyAuditEvent{
|
|
StickyKey: "sticky-1",
|
|
StickyKeyType: "conversation",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
RouteID: "asxs",
|
|
Action: "bind",
|
|
ExpiresAt: "2026-05-28T18:00:00Z",
|
|
}); err != nil {
|
|
t.Fatalf("AppendStickyAudit() error = %v", err)
|
|
}
|
|
if err := writer.Flush(context.Background()); err != nil {
|
|
t.Fatalf("Flush() error = %v", err)
|
|
}
|
|
|
|
store, err := sqlite.Open(context.Background(), dsn)
|
|
if err != nil {
|
|
t.Fatalf("sqlite.Open() error = %v", err)
|
|
}
|
|
defer store.Close()
|
|
|
|
decisions, err := store.RouteDecisionLogs().ListRecent(context.Background(), sqlite.RouteDecisionLogFilter{Limit: 10})
|
|
if err != nil {
|
|
t.Fatalf("RouteDecisionLogs().ListRecent() error = %v", err)
|
|
}
|
|
if len(decisions) != 1 || decisions[0].SelectedRouteID != "asxs" || !decisions[0].StickyHit {
|
|
t.Fatalf("decision logs = %+v", decisions)
|
|
}
|
|
|
|
failovers, err := store.RouteFailoverEvents().ListRecent(context.Background(), sqlite.RouteFailoverEventFilter{Limit: 10})
|
|
if err != nil {
|
|
t.Fatalf("RouteFailoverEvents().ListRecent() error = %v", err)
|
|
}
|
|
if len(failovers) != 1 || failovers[0].ToRouteID != "codex2api" {
|
|
t.Fatalf("failover events = %+v", failovers)
|
|
}
|
|
|
|
stickyAudit, err := store.RouteStickyAudit().ListRecent(context.Background(), sqlite.RouteStickyAuditFilter{Limit: 10})
|
|
if err != nil {
|
|
t.Fatalf("RouteStickyAudit().ListRecent() error = %v", err)
|
|
}
|
|
if len(stickyAudit) != 1 || stickyAudit[0].Action != "bind" {
|
|
t.Fatalf("sticky audit = %+v", stickyAudit)
|
|
}
|
|
}
|
|
|
|
func TestAsyncLogWriterFlushFailureDoesNotCrash(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
sink := &failingRouteLogSink{}
|
|
writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{
|
|
QueueSize: 2,
|
|
FlushInterval: time.Hour,
|
|
MaxBatchSize: 2,
|
|
})
|
|
defer writer.Close()
|
|
|
|
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
|
RequestID: "req-fail",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
SelectedRouteID: "asxs",
|
|
SelectedShadowGroupID: "gpt-shared__asxs",
|
|
}); err != nil {
|
|
t.Fatalf("AppendDecision() error = %v", err)
|
|
}
|
|
if err := writer.Flush(context.Background()); err == nil {
|
|
t.Fatal("Flush() error = nil, want failure")
|
|
}
|
|
if sink.appendCalls == 0 {
|
|
t.Fatal("appendCalls = 0, want at least one attempted write")
|
|
}
|
|
}
|
|
|
|
func TestAsyncLogWriterFlushesQueuedEvents(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
sink := &recordingRouteLogSink{}
|
|
writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{
|
|
QueueSize: 1,
|
|
FlushInterval: time.Hour,
|
|
MaxBatchSize: 100,
|
|
})
|
|
defer writer.Close()
|
|
|
|
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
|
RequestID: "req-a",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
SelectedRouteID: "asxs",
|
|
SelectedShadowGroupID: "gpt-shared__asxs",
|
|
}); err != nil {
|
|
t.Fatalf("AppendDecision() first error = %v", err)
|
|
}
|
|
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
|
RequestID: "req-b",
|
|
LogicalGroupID: "gpt-shared",
|
|
PublicModel: "gpt-5.4",
|
|
SelectedRouteID: "codex2api",
|
|
SelectedShadowGroupID: "gpt-shared__codex2api",
|
|
}); err != nil {
|
|
t.Fatalf("AppendDecision() second error = %v", err)
|
|
}
|
|
if err := writer.Flush(context.Background()); err != nil {
|
|
t.Fatalf("Flush() error = %v", err)
|
|
}
|
|
|
|
if len(sink.decisions) != 2 {
|
|
t.Fatalf("recorded decisions len = %d, want 2", len(sink.decisions))
|
|
}
|
|
}
|
|
|
|
func TestAsyncLogWriterFlushAfterCloseReturnsNil(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
writer := NewAsyncLogWriter(&recordingRouteLogSink{}, AsyncLogWriterOptions{
|
|
QueueSize: 1,
|
|
FlushInterval: time.Hour,
|
|
MaxBatchSize: 1,
|
|
})
|
|
if err := writer.Close(); err != nil {
|
|
t.Fatalf("Close() error = %v", err)
|
|
}
|
|
if err := writer.Flush(context.Background()); err != nil {
|
|
t.Fatalf("Flush() after close error = %v, want nil", err)
|
|
}
|
|
}
|
|
|
|
type failingRouteLogSink struct {
|
|
appendCalls int
|
|
}
|
|
|
|
func (s *failingRouteLogSink) AppendDecision(context.Context, RouteDecisionEvent) error {
|
|
s.appendCalls++
|
|
return errors.New("boom")
|
|
}
|
|
|
|
func (s *failingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error {
|
|
s.appendCalls++
|
|
return errors.New("boom")
|
|
}
|
|
|
|
func (s *failingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error {
|
|
s.appendCalls++
|
|
return errors.New("boom")
|
|
}
|
|
|
|
func (s *failingRouteLogSink) Close() error { return nil }
|
|
|
|
type recordingRouteLogSink struct {
|
|
decisions []RouteDecisionEvent
|
|
}
|
|
|
|
func (s *recordingRouteLogSink) AppendDecision(_ context.Context, event RouteDecisionEvent) error {
|
|
s.decisions = append(s.decisions, event)
|
|
return nil
|
|
}
|
|
|
|
func (s *recordingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *recordingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *recordingRouteLogSink) Close() error { return nil }
|