Files
sub2api-cn-relay-manager/internal/batch/confirmation.go

272 lines
7.6 KiB
Go

package batch
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type ConfirmationResult struct {
StatusCode int
Message string
}
type ConfirmationItemStore interface {
List(ctx context.Context) ([]sqlite.ImportRunItem, error)
Upsert(ctx context.Context, item sqlite.ImportRunItem) error
}
type ConfirmationLeaseClaimer interface {
TryAcquireLease(ctx context.Context, itemID, workerID string, now time.Time, leaseDuration time.Duration) (sqlite.ImportRunItem, bool, error)
}
type ConfirmationEventStore interface {
Append(ctx context.Context, event sqlite.ImportRunItemEvent) error
}
type ConfirmationWorker struct {
WorkerID string
ItemStore ConfirmationItemStore
EventStore ConfirmationEventStore
LeaseDuration time.Duration
RetryDelay time.Duration
Confirmer func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error)
}
func (w ConfirmationWorker) Tick(ctx context.Context, now time.Time) error {
if w.ItemStore == nil {
return fmt.Errorf("item store is required")
}
if w.EventStore == nil {
return fmt.Errorf("event store is required")
}
if w.Confirmer == nil {
return fmt.Errorf("confirmer is required")
}
items, err := w.ItemStore.List(ctx)
if err != nil {
return err
}
for _, item := range items {
if !isConfirmationCandidate(item, now) {
continue
}
if claimer, ok := w.ItemStore.(ConfirmationLeaseClaimer); ok {
claimedItem, claimed, err := claimer.TryAcquireLease(ctx, item.ItemID, w.WorkerID, now, w.LeaseDuration)
if err != nil {
return err
}
if !claimed {
continue
}
item = claimedItem
}
if err := w.ConfirmItem(ctx, item, now); err != nil {
return err
}
}
return nil
}
func (w ConfirmationWorker) ConfirmItem(ctx context.Context, item sqlite.ImportRunItem, now time.Time) error {
result, err := w.Confirmer(ctx, item)
if err != nil {
return err
}
if strings.TrimSpace(item.LeaseOwner) == "" {
item.ConfirmationAttempts++
item.LeaseOwner = strings.TrimSpace(w.WorkerID)
item.LeaseUntil = now.Add(defaultDuration(w.LeaseDuration, time.Minute)).Format(time.RFC3339)
}
switch {
case result.StatusCode >= 200 && result.StatusCode < 300:
item.ConfirmationStatus = string(ConfirmationConfirmed)
item.CurrentStage = string(ItemStageValidate)
item.NextRetryAt = ""
item.LastError = ""
item.LastErrorStage = ""
item.LeaseOwner = ""
item.LeaseUntil = ""
if err := w.ItemStore.Upsert(ctx, item); err != nil {
return err
}
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
EventID: confirmationEventID(item.ItemID, "stage_transition", now),
RunID: item.RunID,
ItemID: item.ItemID,
EventType: "stage_transition",
Stage: string(ItemStageValidate),
Attempt: item.ConfirmationAttempts,
Message: "confirmation succeeded",
PayloadJSON: `{"confirmation_status":"confirmed"}`,
})
case result.StatusCode == 403 && supportsProbe403Advisory(item.CapabilityProfileJSON):
item.ConfirmationStatus = string(ConfirmationAdvisory)
item.CurrentStage = string(ItemStageValidate)
item.AdvisoryMessagesJSON = appendAdvisoryJSON(item.AdvisoryMessagesJSON, "initial_probe_race_expected")
item.LastError = strings.TrimSpace(result.Message)
item.LastErrorStage = string(ItemStageConfirm)
item.NextRetryAt = ""
item.LeaseOwner = ""
item.LeaseUntil = ""
if err := w.ItemStore.Upsert(ctx, item); err != nil {
return err
}
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
EventID: confirmationEventID(item.ItemID, "advisory_added", now),
RunID: item.RunID,
ItemID: item.ItemID,
EventType: "advisory_added",
Stage: string(ItemStageConfirm),
Attempt: item.ConfirmationAttempts,
Message: "initial probe race handled as advisory",
PayloadJSON: `{"advisory":"initial_probe_race_expected"}`,
})
case isWarmupRetryCandidate(result):
item.RetryCount++
item.LastRetryAt = now.Format(time.RFC3339)
item.NextRetryAt = now.Add(defaultDuration(w.RetryDelay, time.Second)).Format(time.RFC3339)
item.LastError = strings.TrimSpace(result.Message)
item.LastErrorStage = string(ItemStageConfirm)
item.LeaseOwner = ""
item.LeaseUntil = ""
if err := w.ItemStore.Upsert(ctx, item); err != nil {
return err
}
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
EventID: confirmationEventID(item.ItemID, "retry_scheduled", now),
RunID: item.RunID,
ItemID: item.ItemID,
EventType: "retry_scheduled",
Stage: string(ItemStageConfirm),
Attempt: item.ConfirmationAttempts,
Message: "initial 503 no available accounts, retry scheduled",
PayloadJSON: fmt.Sprintf(`{"next_retry_at":%q}`, item.NextRetryAt),
})
default:
item.ConfirmationStatus = string(ConfirmationFailed)
item.CurrentStage = string(ItemStageDone)
item.LastError = strings.TrimSpace(result.Message)
item.LastErrorStage = string(ItemStageConfirm)
item.NextRetryAt = ""
item.LeaseOwner = ""
item.LeaseUntil = ""
if err := w.ItemStore.Upsert(ctx, item); err != nil {
return err
}
return nil
}
}
func isConfirmationCandidate(item sqlite.ImportRunItem, now time.Time) bool {
if item.CurrentStage != string(ItemStageConfirm) {
return false
}
if item.ConfirmationStatus != string(ConfirmationPending) {
return false
}
if !isRetryDue(item.NextRetryAt, now) {
return false
}
if !leaseExpired(item.LeaseUntil, now) {
return false
}
return true
}
func isRetryDue(nextRetryAt string, now time.Time) bool {
nextRetryAt = strings.TrimSpace(nextRetryAt)
if nextRetryAt == "" {
return true
}
parsed, err := time.Parse(time.RFC3339, nextRetryAt)
if err != nil {
return true
}
return !parsed.After(now)
}
func leaseExpired(leaseUntil string, now time.Time) bool {
leaseUntil = strings.TrimSpace(leaseUntil)
if leaseUntil == "" {
return true
}
parsed, err := time.Parse(time.RFC3339, leaseUntil)
if err != nil {
return true
}
return parsed.Before(now)
}
func supportsProbe403Advisory(capabilityProfileJSON string) bool {
var payload struct {
TransportProfile struct {
KnownAdvisories []string `json:"known_advisories"`
} `json:"transport_profile"`
}
if err := json.Unmarshal([]byte(strings.TrimSpace(capabilityProfileJSON)), &payload); err != nil {
return false
}
for _, advisory := range payload.TransportProfile.KnownAdvisories {
if advisory == "initial_probe_race_expected" {
return true
}
}
return false
}
func isWarmupRetryCandidate(result ConfirmationResult) bool {
message := strings.ToLower(strings.TrimSpace(result.Message))
return result.StatusCode == 503 && strings.Contains(message, "no available accounts")
}
func appendAdvisoryJSON(rawJSON, advisory string) string {
advisory = strings.TrimSpace(advisory)
if advisory == "" {
return defaultJSONString(rawJSON, "[]")
}
values := []string{}
if err := json.Unmarshal([]byte(defaultJSONString(rawJSON, "[]")), &values); err != nil {
values = []string{}
}
for _, existing := range values {
if existing == advisory {
payload, _ := json.Marshal(values)
return string(payload)
}
}
values = append(values, advisory)
payload, err := json.Marshal(values)
if err != nil {
return defaultJSONString(rawJSON, "[]")
}
return string(payload)
}
func confirmationEventID(itemID, eventType string, now time.Time) string {
return fmt.Sprintf("%s-%s-%d", itemID, eventType, now.UnixNano())
}
func defaultDuration(value, fallback time.Duration) time.Duration {
if value > 0 {
return value
}
return fallback
}
func defaultJSONString(value, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}