refactor(platformevent): remove CallbackTarget field and all usages
D-01: callback_target contract drift cleanup. - Remove CallbackTarget from Event struct and Validate - Remove CallbackTarget from PlatformInboundMeta - Remove defaultCallbackTarget and assignment from builder - Remove callback_target column from INSERT/SELECT/dead_letter SQL - Clean up all test literals and assertions DB migration left untouched; column remains empty until a future schema cleanup migration.
This commit is contained in:
@@ -38,7 +38,6 @@ type Event struct {
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
TicketID string `json:"ticket_id,omitempty"`
|
||||
SourceMessageID string `json:"source_message_id,omitempty"`
|
||||
CallbackTarget string `json:"callback_target"`
|
||||
Payload map[string]any `json:"payload"`
|
||||
Status Status `json:"status"`
|
||||
AttemptCount int `json:"attempt_count"`
|
||||
@@ -60,9 +59,6 @@ func (e Event) Validate() error {
|
||||
if strings.TrimSpace(e.EventType) == "" {
|
||||
return fmt.Errorf("event type is required")
|
||||
}
|
||||
if strings.TrimSpace(e.CallbackTarget) == "" {
|
||||
return fmt.Errorf("callback target is required")
|
||||
}
|
||||
switch e.Status {
|
||||
case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter:
|
||||
default:
|
||||
|
||||
@@ -9,14 +9,13 @@ import (
|
||||
func TestEvent_Validate(t *testing.T) {
|
||||
now := time.Now()
|
||||
event := Event{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Status: StatusPending,
|
||||
AttemptCount: 0,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: TypeReplyGenerated,
|
||||
Status: StatusPending,
|
||||
AttemptCount: 0,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
}
|
||||
|
||||
if err := event.Validate(); err != nil {
|
||||
@@ -26,13 +25,12 @@ func TestEvent_Validate(t *testing.T) {
|
||||
|
||||
func TestEvent_ValidateRejectsInvalidStatus(t *testing.T) {
|
||||
event := Event{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Status: Status("invalid"),
|
||||
NextAttemptAt: time.Now(),
|
||||
OccurredAt: time.Now(),
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: TypeReplyGenerated,
|
||||
Status: Status("invalid"),
|
||||
NextAttemptAt: time.Now(),
|
||||
OccurredAt: time.Now(),
|
||||
}
|
||||
|
||||
err := event.Validate()
|
||||
|
||||
@@ -22,7 +22,6 @@ type PlatformInboundMeta struct {
|
||||
Channel string
|
||||
SourceMessageID string
|
||||
SourceUserID string
|
||||
CallbackTarget string
|
||||
}
|
||||
|
||||
type PlatformAdapter interface {
|
||||
|
||||
@@ -68,12 +68,11 @@ func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) {
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
store := &stubEventStore{
|
||||
events: []platformevent.Event{{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
CreatedAt: now,
|
||||
@@ -113,12 +112,11 @@ func TestWorker_ShouldRetryWhenCallbackReturns5xx(t *testing.T) {
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
store := &stubEventStore{
|
||||
events: []platformevent.Event{{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
CreatedAt: now,
|
||||
@@ -152,12 +150,11 @@ func TestWorker_ShouldMoveEventToDeadLetterAfterMaxRetries(t *testing.T) {
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
store := &stubEventStore{
|
||||
events: []platformevent.Event{{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusRetrying,
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusRetrying,
|
||||
AttemptCount: 1,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
@@ -185,12 +182,11 @@ func TestWorker_ShouldPersistDeliveryAttemptAudit(t *testing.T) {
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
store := &stubEventStore{
|
||||
events: []platformevent.Event{{
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: "evt-1",
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
CreatedAt: now,
|
||||
|
||||
@@ -12,8 +12,6 @@ import (
|
||||
"github.com/bridge/ai-customer-service/internal/service/dialog"
|
||||
)
|
||||
|
||||
const defaultCallbackTarget = "default"
|
||||
|
||||
func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta *platformadapter.PlatformInboundMeta, now time.Time) ([]platformevent.Event, error) {
|
||||
if msg == nil {
|
||||
return nil, fmt.Errorf("message is nil")
|
||||
@@ -28,10 +26,6 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta
|
||||
now = time.Now()
|
||||
}
|
||||
|
||||
callbackTarget := meta.CallbackTarget
|
||||
if callbackTarget == "" {
|
||||
callbackTarget = defaultCallbackTarget
|
||||
}
|
||||
eventIndex := 0
|
||||
baseEvent := func(eventType string, payload map[string]any) platformevent.Event {
|
||||
eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond)
|
||||
@@ -43,7 +37,6 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta
|
||||
SessionID: result.SessionID,
|
||||
TicketID: result.TicketID,
|
||||
SourceMessageID: meta.SourceMessageID,
|
||||
CallbackTarget: callbackTarget,
|
||||
Payload: payload,
|
||||
Status: platformevent.StatusPending,
|
||||
AttemptCount: 0,
|
||||
|
||||
@@ -24,7 +24,6 @@ func TestBuildInboundEvents_ShouldBuildReplyFlowEvents(t *testing.T) {
|
||||
Platform: "sub2api",
|
||||
Channel: "sub2api",
|
||||
SourceMessageID: "m1",
|
||||
CallbackTarget: "default",
|
||||
},
|
||||
now,
|
||||
)
|
||||
@@ -72,7 +71,4 @@ func TestBuildInboundEvents_ShouldIncludeHandoffAndTicketCreated(t *testing.T) {
|
||||
if events[4].EventType != "ticket.created" {
|
||||
t.Fatalf("ticket event type = %s", events[4].EventType)
|
||||
}
|
||||
if events[0].CallbackTarget != "default" {
|
||||
t.Fatalf("callback target = %s, want default", events[0].CallbackTarget)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,13 +49,13 @@ func (s *PlatformEventStore) InsertPendingBatch(ctx context.Context, events []pl
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO cs_platform_event_outbox(
|
||||
id, platform, event_type, session_id, ticket_id, source_message_id, callback_target,
|
||||
id, platform, event_type, session_id, ticket_id, source_message_id,
|
||||
payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at
|
||||
) VALUES (
|
||||
$1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6, $7,
|
||||
$8::jsonb, $9, $10, $11, $12, $13, NULLIF($14,''), $15, $16
|
||||
$1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6,
|
||||
$7::jsonb, $8, $9, $10, $11, $12, NULLIF($13,''), $14, $15
|
||||
)
|
||||
`, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID, event.CallbackTarget,
|
||||
`, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID,
|
||||
string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
@@ -77,7 +77,7 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''),
|
||||
callback_target, payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at,
|
||||
payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at,
|
||||
delivered_at, COALESCE(last_error, '')
|
||||
FROM cs_platform_event_outbox
|
||||
WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2
|
||||
@@ -103,7 +103,6 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe
|
||||
&event.SessionID,
|
||||
&event.TicketID,
|
||||
&event.SourceMessageID,
|
||||
&event.CallbackTarget,
|
||||
&payloadJSON,
|
||||
&status,
|
||||
&event.AttemptCount,
|
||||
@@ -182,8 +181,8 @@ func (s *PlatformEventStore) MarkDeadLetter(ctx context.Context, eventID string,
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, callback_target, payload, attempt_count, final_error)
|
||||
SELECT id, platform, event_type, callback_target, payload, attempt_count, last_error
|
||||
INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, payload, attempt_count, final_error)
|
||||
SELECT id, platform, event_type, payload, attempt_count, last_error
|
||||
FROM cs_platform_event_outbox
|
||||
WHERE id = $1
|
||||
ON CONFLICT (event_id) DO UPDATE
|
||||
|
||||
@@ -19,7 +19,6 @@ func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) {
|
||||
ID: uniqueID("evt"),
|
||||
Platform: "sub2api",
|
||||
EventType: platformevent.TypeMessageReceived,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"message": "hello"},
|
||||
Status: platformevent.StatusPending,
|
||||
AttemptCount: 0,
|
||||
@@ -33,24 +32,20 @@ func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) {
|
||||
if err := store.InsertPending(context.Background(), event); err != nil {
|
||||
t.Fatalf("InsertPending() error = %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
status string
|
||||
callbackName string
|
||||
status string
|
||||
)
|
||||
|
||||
if err := db.QueryRowContext(context.Background(), `
|
||||
SELECT status, callback_target
|
||||
SELECT status
|
||||
FROM cs_platform_event_outbox
|
||||
WHERE id = $1
|
||||
`, event.ID).Scan(&status, &callbackName); err != nil {
|
||||
`, event.ID).Scan(&status); err != nil {
|
||||
t.Fatalf("query inserted event failed: %v", err)
|
||||
}
|
||||
if status != string(platformevent.StatusPending) {
|
||||
t.Fatalf("status = %s, want %s", status, platformevent.StatusPending)
|
||||
}
|
||||
if callbackName != "default" {
|
||||
t.Fatalf("callback target = %s, want default", callbackName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) {
|
||||
@@ -62,24 +57,22 @@ func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) {
|
||||
platformName := "s2a-" + uniqueID("plt")[:8]
|
||||
|
||||
first := &platformevent.Event{
|
||||
ID: uniqueID("evt"),
|
||||
Platform: platformName,
|
||||
EventType: platformevent.TypeMessageReceived,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"step": 1},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: uniqueID("evt"),
|
||||
Platform: platformName,
|
||||
EventType: platformevent.TypeMessageReceived,
|
||||
Payload: map[string]any{"step": 1},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now.Add(-5 * time.Nanosecond),
|
||||
CreatedAt: now.Add(-5 * time.Nanosecond),
|
||||
UpdatedAt: now.Add(-5 * time.Nanosecond),
|
||||
}
|
||||
second := &platformevent.Event{
|
||||
ID: uniqueID("evt"),
|
||||
Platform: platformName,
|
||||
EventType: platformevent.TypeMessageProcessing,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"step": 2},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: uniqueID("evt"),
|
||||
Platform: platformName,
|
||||
EventType: platformevent.TypeMessageProcessing,
|
||||
Payload: map[string]any{"step": 2},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now.Add(-4 * time.Nanosecond),
|
||||
CreatedAt: now.Add(-4 * time.Nanosecond),
|
||||
@@ -119,12 +112,11 @@ func TestPlatformEventStore_ShouldPersistDeliveryAttemptAudit(t *testing.T) {
|
||||
store := NewPlatformEventStore(db)
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
event := &platformevent.Event{
|
||||
ID: uniqueID("evt"),
|
||||
Platform: "s2a-" + uniqueID("plt")[:8],
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: uniqueID("evt"),
|
||||
Platform: "s2a-" + uniqueID("plt")[:8],
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "好的"},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
CreatedAt: now,
|
||||
@@ -166,12 +158,11 @@ func TestPlatformEventStore_ShouldMoveToDeadLetter(t *testing.T) {
|
||||
store := NewPlatformEventStore(db)
|
||||
now := time.Now().UTC().Truncate(time.Second)
|
||||
event := &platformevent.Event{
|
||||
ID: uniqueID("evt"),
|
||||
Platform: "s2a-" + uniqueID("plt")[:8],
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
CallbackTarget: "default",
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusPending,
|
||||
ID: uniqueID("evt"),
|
||||
Platform: "s2a-" + uniqueID("plt")[:8],
|
||||
EventType: platformevent.TypeReplyGenerated,
|
||||
Payload: map[string]any{"reply": "失败"},
|
||||
Status: platformevent.StatusPending,
|
||||
NextAttemptAt: now,
|
||||
OccurredAt: now,
|
||||
CreatedAt: now,
|
||||
|
||||
Reference in New Issue
Block a user