fix(e2e,store,delivery): fix E2E stability - lifecycle ordering, event ordering, callback_target NOT NULL, worker ticker loop
This commit is contained in:
@@ -64,6 +64,9 @@ func (w *Worker) Start(ctx context.Context) {
|
|||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if err := w.RunOnce(ctx); err != nil && w.Logger != nil {
|
||||||
|
w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error())
|
||||||
|
}
|
||||||
ticker := time.NewTicker(w.pollInterval())
|
ticker := time.NewTicker(w.pollInterval())
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
claimTicker := time.NewTicker(30 * time.Second)
|
claimTicker := time.NewTicker(30 * time.Second)
|
||||||
@@ -72,19 +75,10 @@ func (w *Worker) Start(ctx context.Context) {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
case <-ticker.C:
|
||||||
}
|
|
||||||
if err := w.RunOnce(ctx); err != nil && w.Logger != nil {
|
if err := w.RunOnce(ctx); err != nil && w.Logger != nil {
|
||||||
w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error())
|
w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error())
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-claimTicker.C:
|
case <-claimTicker.C:
|
||||||
if w.Store != nil {
|
if w.Store != nil {
|
||||||
if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil {
|
if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil {
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta
|
|||||||
|
|
||||||
eventIndex := 0
|
eventIndex := 0
|
||||||
baseEvent := func(eventType string, payload map[string]any) platformevent.Event {
|
baseEvent := func(eventType string, payload map[string]any) platformevent.Event {
|
||||||
eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond)
|
eventTime := now.Add(time.Duration(eventIndex) * time.Millisecond)
|
||||||
eventIndex++
|
eventIndex++
|
||||||
return platformevent.Event{
|
return platformevent.Event{
|
||||||
ID: uuid.New().String(),
|
ID: uuid.New().String(),
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -50,10 +51,10 @@ func (s *PlatformEventStore) InsertPendingBatch(ctx context.Context, events []pl
|
|||||||
if _, err := tx.ExecContext(ctx, `
|
if _, err := tx.ExecContext(ctx, `
|
||||||
INSERT INTO cs_platform_event_outbox(
|
INSERT INTO cs_platform_event_outbox(
|
||||||
id, platform, event_type, session_id, ticket_id, source_message_id,
|
id, platform, event_type, session_id, ticket_id, source_message_id,
|
||||||
payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at
|
callback_target, payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at
|
||||||
) VALUES (
|
) VALUES (
|
||||||
$1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6,
|
$1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6,
|
||||||
$7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15
|
'', $7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15
|
||||||
)
|
)
|
||||||
`, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID,
|
`, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID,
|
||||||
string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil {
|
string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil {
|
||||||
@@ -146,6 +147,18 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe
|
|||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
sort.Slice(events, func(i, j int) bool {
|
||||||
|
if !events[i].NextAttemptAt.Equal(events[j].NextAttemptAt) {
|
||||||
|
return events[i].NextAttemptAt.Before(events[j].NextAttemptAt)
|
||||||
|
}
|
||||||
|
if !events[i].OccurredAt.Equal(events[j].OccurredAt) {
|
||||||
|
return events[i].OccurredAt.Before(events[j].OccurredAt)
|
||||||
|
}
|
||||||
|
if !events[i].CreatedAt.Equal(events[j].CreatedAt) {
|
||||||
|
return events[i].CreatedAt.Before(events[j].CreatedAt)
|
||||||
|
}
|
||||||
|
return events[i].ID < events[j].ID
|
||||||
|
})
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,8 +233,8 @@ func (s *PlatformEventStore) MarkDeadLetter(ctx context.Context, eventID string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := tx.ExecContext(ctx, `
|
if _, err := tx.ExecContext(ctx, `
|
||||||
INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, payload, attempt_count, final_error)
|
INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, callback_target, payload, attempt_count, final_error)
|
||||||
SELECT id, platform, event_type, payload, attempt_count, last_error
|
SELECT id, platform, event_type, callback_target, payload, attempt_count, last_error
|
||||||
FROM cs_platform_event_outbox
|
FROM cs_platform_event_outbox
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
ON CONFLICT (event_id) DO UPDATE
|
ON CONFLICT (event_id) DO UPDATE
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ func waitForSessionEvents(t *testing.T, timeout time.Duration, eventsCh <-chan p
|
|||||||
if len(filtered) == want {
|
if len(filtered) == want {
|
||||||
return filtered
|
return filtered
|
||||||
}
|
}
|
||||||
case <-time.After(200 * time.Millisecond):
|
case <-time.After(50 * time.Millisecond):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,7 +159,11 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
|
|||||||
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
|
||||||
t.Fatalf("decode callback body failed: %v", err)
|
t.Fatalf("decode callback body failed: %v", err)
|
||||||
}
|
}
|
||||||
eventsCh <- event
|
select {
|
||||||
|
case eventsCh <- event:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("eventsCh send timeout")
|
||||||
|
}
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}))
|
}))
|
||||||
defer callbackServer.Close()
|
defer callbackServer.Close()
|
||||||
@@ -205,6 +209,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
|
|||||||
}
|
}
|
||||||
|
|
||||||
filtered := waitForSessionEvents(t, 8*time.Second, eventsCh, sessionID, 6)
|
filtered := waitForSessionEvents(t, 8*time.Second, eventsCh, sessionID, 6)
|
||||||
|
|
||||||
wantTypes := []string{
|
wantTypes := []string{
|
||||||
platformevent.TypeMessageReceived,
|
platformevent.TypeMessageReceived,
|
||||||
platformevent.TypeMessageProcessing,
|
platformevent.TypeMessageProcessing,
|
||||||
|
|||||||
Reference in New Issue
Block a user