270 lines
10 KiB
Go
270 lines
10 KiB
Go
|
|
package repository
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"crypto/rand"
|
||
|
|
"encoding/hex"
|
||
|
|
"fmt"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"sort"
|
||
|
|
"sync"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/company/ai-ops/internal/config"
|
||
|
|
"github.com/company/ai-ops/internal/database"
|
||
|
|
"github.com/company/ai-ops/internal/domain/model"
|
||
|
|
"github.com/company/ai-ops/internal/service"
|
||
|
|
)
|
||
|
|
|
||
|
|
var pgMigrationOnce sync.Once
|
||
|
|
var pgMigrationErr error
|
||
|
|
|
||
|
|
func setupPGIntegration(t *testing.T) context.Context {
|
||
|
|
t.Helper()
|
||
|
|
ctx := context.Background()
|
||
|
|
if database.Pool == nil {
|
||
|
|
ports := []int{15432, 5432}
|
||
|
|
var lastErr error
|
||
|
|
for _, port := range ports {
|
||
|
|
lastErr = database.Init(config.DatabaseConfig{Host: "localhost", Port: port, User: "aiops", Password: "aiops123", DBName: "ai_ops", SSLMode: "disable", PoolSize: 4})
|
||
|
|
if lastErr == nil {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
database.Close()
|
||
|
|
database.Pool = nil
|
||
|
|
}
|
||
|
|
if lastErr != nil {
|
||
|
|
t.Skipf("PostgreSQL integration database not available: %v", lastErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
pgMigrationOnce.Do(func() {
|
||
|
|
pgMigrationErr = applyMigrations(ctx)
|
||
|
|
})
|
||
|
|
if pgMigrationErr != nil {
|
||
|
|
t.Fatalf("apply migrations: %v", pgMigrationErr)
|
||
|
|
}
|
||
|
|
return ctx
|
||
|
|
}
|
||
|
|
|
||
|
|
func applyMigrations(ctx context.Context) error {
|
||
|
|
if _, err := database.Pool.Exec(ctx, `SELECT pg_advisory_lock(424242001)`); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer database.Pool.Exec(ctx, `SELECT pg_advisory_unlock(424242001)`)
|
||
|
|
|
||
|
|
files, err := filepath.Glob(filepath.Join("..", "..", "..", "tech", "migrations", "*.up.sql"))
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
sort.Strings(files)
|
||
|
|
for _, f := range files {
|
||
|
|
b, err := os.ReadFile(f)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if _, err := database.Pool.Exec(ctx, string(b)); err != nil {
|
||
|
|
return fmt.Errorf("%s: %w", f, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func testUUID(t *testing.T) string {
|
||
|
|
t.Helper()
|
||
|
|
b := make([]byte, 16)
|
||
|
|
if _, err := rand.Read(b); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
b[6] = (b[6] & 0x0f) | 0x40
|
||
|
|
b[8] = (b[8] & 0x3f) | 0x80
|
||
|
|
return hex.EncodeToString(b[0:4]) + "-" + hex.EncodeToString(b[4:6]) + "-" + hex.EncodeToString(b[6:8]) + "-" + hex.EncodeToString(b[8:10]) + "-" + hex.EncodeToString(b[10:16])
|
||
|
|
}
|
||
|
|
|
||
|
|
func cleanupIDs(t *testing.T, ctx context.Context, ids ...string) {
|
||
|
|
t.Helper()
|
||
|
|
for _, id := range ids {
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_notification_logs WHERE id=$1 OR event_id=$1 OR channel_id=$1`, id)
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_healings WHERE id=$1 OR alert_id=$1`, id)
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_alerts WHERE id=$1 OR rule_id=$1 OR parent_alert_id=$1`, id)
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_rules WHERE id=$1`, id)
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_channels WHERE id=$1`, id)
|
||
|
|
_, _ = database.Pool.Exec(ctx, `DELETE FROM ai_ops_request_logs WHERE id=$1`, id)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPGChannelRepositoryCRUD(t *testing.T) {
|
||
|
|
ctx := setupPGIntegration(t)
|
||
|
|
repo := NewPGChannelRepository()
|
||
|
|
id := testUUID(t)
|
||
|
|
defer cleanupIDs(t, ctx, id)
|
||
|
|
|
||
|
|
ch := &model.NotificationChannel{ID: id, Name: "test-channel", ChannelType: "webhook", Config: map[string]any{"webhook_url": "http://example.invalid"}, Priority: 7, Enabled: true}
|
||
|
|
if err := repo.Create(ctx, ch); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
got, err := repo.GetByID(ctx, id)
|
||
|
|
if err != nil || got.ID != id || got.Name != ch.Name {
|
||
|
|
t.Fatalf("get channel = %+v %v", got, err)
|
||
|
|
}
|
||
|
|
list, err := repo.List(ctx)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
found := false
|
||
|
|
for _, item := range list {
|
||
|
|
if item.ID == id {
|
||
|
|
found = true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if !found {
|
||
|
|
t.Fatalf("created channel not found in list: %+v", list)
|
||
|
|
}
|
||
|
|
ch.Name = "updated-channel"
|
||
|
|
ch.Priority = 8
|
||
|
|
if err := repo.Update(ctx, ch); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
updated, err := repo.GetByID(ctx, id)
|
||
|
|
if err != nil || updated.Name != "updated-channel" || updated.Priority != 8 {
|
||
|
|
t.Fatalf("updated channel = %+v %v", updated, err)
|
||
|
|
}
|
||
|
|
if err := repo.Delete(ctx, id); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if _, err := repo.GetByID(ctx, id); err == nil {
|
||
|
|
t.Fatal("expected not found after delete")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPGAlertRepositoryRulesEventsAndAggregation(t *testing.T) {
|
||
|
|
ctx := setupPGIntegration(t)
|
||
|
|
repo := NewPGAlertRepository()
|
||
|
|
ruleID, eventID, childID := testUUID(t), testUUID(t), testUUID(t)
|
||
|
|
defer cleanupIDs(t, ctx, ruleID, eventID, childID)
|
||
|
|
|
||
|
|
rule := &model.AlertRule{ID: ruleID, Name: "rule-" + ruleID, MetricSource: "prom", MetricName: "p99", ThresholdType: ">", ThresholdValue: "100", DurationMin: 1, Level: "P1", ChannelIDs: []string{}, IsSandboxed: true, Enabled: true, Version: 1, CreatedBy: "test"}
|
||
|
|
if err := repo.CreateRule(ctx, rule); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if got, err := repo.GetRuleByID(ctx, ruleID); err != nil || got.ID != ruleID || got.Name != rule.Name {
|
||
|
|
t.Fatalf("get rule = %+v %v", got, err)
|
||
|
|
}
|
||
|
|
rules, err := repo.ListRules(ctx)
|
||
|
|
if err != nil || len(rules) == 0 {
|
||
|
|
t.Fatalf("list rules = %d %v", len(rules), err)
|
||
|
|
}
|
||
|
|
rule.Name = "rule-updated-" + ruleID
|
||
|
|
rule.Version = 2
|
||
|
|
if err := repo.UpdateRule(ctx, rule); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
now := time.Now().UTC()
|
||
|
|
event := &model.AlertEvent{ID: eventID, RuleID: ruleID, Level: "P1", ResourceType: "svc", ResourceID: "res-" + ruleID, CurrentValue: "120", ThresholdValue: "100", Status: "triggered", StartedAt: now}
|
||
|
|
created, err := repo.CreateEventWithAggregation(ctx, event, time.Minute, 10)
|
||
|
|
if err != nil || created.ID != eventID {
|
||
|
|
t.Fatalf("create event = %+v %v", created, err)
|
||
|
|
}
|
||
|
|
directID := testUUID(t)
|
||
|
|
defer cleanupIDs(t, ctx, directID)
|
||
|
|
if err := repo.CreateEvent(ctx, &model.AlertEvent{ID: directID, RuleID: ruleID, Level: "P2", ResourceType: "svc", ResourceID: "direct-" + ruleID, CurrentValue: "101", ThresholdValue: "100", Status: "triggered", StartedAt: now.Add(2 * time.Second)}); err != nil {
|
||
|
|
t.Fatalf("create direct event: %v", err)
|
||
|
|
}
|
||
|
|
if err := repo.UpdateEventStatus(ctx, eventID, "resolved"); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := repo.EscalateEvent(ctx, eventID, "P0"); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
agg, err := repo.CreateEventWithAggregation(ctx, &model.AlertEvent{ID: childID, RuleID: ruleID, Level: "P1", ResourceType: "svc", ResourceID: "res-" + ruleID, CurrentValue: "130", ThresholdValue: "100", Status: "triggered", StartedAt: now.Add(time.Second)}, time.Minute, 1)
|
||
|
|
if err != nil || !agg.IsAggregated || agg.AggregatedCount < 2 {
|
||
|
|
t.Fatalf("aggregation = %+v %v", agg, err)
|
||
|
|
}
|
||
|
|
defer cleanupIDs(t, ctx, agg.ID)
|
||
|
|
|
||
|
|
events, total, err := repo.ListEvents(ctx, "triggered", 1, 20)
|
||
|
|
if err != nil || total < 1 || len(events) < 1 {
|
||
|
|
t.Fatalf("list events = total=%d len=%d err=%v", total, len(events), err)
|
||
|
|
}
|
||
|
|
count, err := repo.GetOpenCount(ctx)
|
||
|
|
if err != nil || count.Open < 1 {
|
||
|
|
t.Fatalf("open count = %+v %v", count, err)
|
||
|
|
}
|
||
|
|
if err := repo.DeleteRule(ctx, ruleID); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPGMetricAndLogRepositories(t *testing.T) {
|
||
|
|
ctx := setupPGIntegration(t)
|
||
|
|
metricRepo := NewPGMetricRepository()
|
||
|
|
logRepo := NewPGLogRepository()
|
||
|
|
logID := testUUID(t)
|
||
|
|
metricName := "test_metric_" + logID
|
||
|
|
defer cleanupIDs(t, ctx, logID)
|
||
|
|
defer database.Pool.Exec(ctx, `DELETE FROM ai_ops_metrics WHERE metric_name=$1`, metricName)
|
||
|
|
|
||
|
|
now := time.Now().UTC()
|
||
|
|
if _, err := database.Pool.Exec(ctx, `INSERT INTO ai_ops_metrics(metric_name, labels, value, recorded_at) VALUES ($1, $2, $3, $4)`, metricName, map[string]string{"source": "test"}, 42.5, now); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
latest, err := metricRepo.GetLatest(ctx, "unit", metricName)
|
||
|
|
if err != nil || latest.Name != metricName || latest.Source != "unit" || latest.Value != 42.5 {
|
||
|
|
t.Fatalf("latest metric = %+v %v", latest, err)
|
||
|
|
}
|
||
|
|
points, err := metricRepo.Query(ctx, model.MetricQueryRequest{Source: "unit", Name: metricName, StartTime: now.Add(-time.Minute), EndTime: now.Add(time.Minute)})
|
||
|
|
if err != nil || len(points) != 1 {
|
||
|
|
t.Fatalf("query metric = %d %v", len(points), err)
|
||
|
|
}
|
||
|
|
if realtime, err := metricRepo.GetRealtime(ctx); err != nil || realtime == nil {
|
||
|
|
t.Fatalf("realtime metric = %+v %v", realtime, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if _, err := database.Pool.Exec(ctx, `INSERT INTO ai_ops_request_logs(id, timestamp, service, path, method, status_code, latency_ms, user_id, supplier_id, error_code) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)`, logID, now, "svc-test", "/unit", "GET", 200, 11.2, "u1", "s1", ""); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
status := 200
|
||
|
|
logs, total, err := logRepo.Query(ctx, model.LogQueryFilter{Service: "svc-test", Path: "/unit", StatusCode: &status, UserID: "u1", SupplierID: "s1", Page: 1, PageSize: 10})
|
||
|
|
if err != nil || total != 1 || len(logs) != 1 || logs[0].ID != logID {
|
||
|
|
t.Fatalf("query logs = total=%d logs=%+v err=%v", total, logs, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPGHealingAndNotificationRepositories(t *testing.T) {
|
||
|
|
ctx := setupPGIntegration(t)
|
||
|
|
ruleID, eventID, channelID, healingID, notificationID := testUUID(t), testUUID(t), testUUID(t), testUUID(t), testUUID(t)
|
||
|
|
defer cleanupIDs(t, ctx, ruleID, eventID, channelID, healingID, notificationID)
|
||
|
|
alertRepo := NewPGAlertRepository()
|
||
|
|
channelRepo := NewPGChannelRepository()
|
||
|
|
healingRepo := NewPGHealingRepository()
|
||
|
|
notificationRepo := NewPGNotificationLogRepository()
|
||
|
|
|
||
|
|
if err := alertRepo.CreateRule(ctx, &model.AlertRule{ID: ruleID, Name: "notify-rule-" + ruleID, MetricSource: "prom", MetricName: "qps", ThresholdType: ">", ThresholdValue: "1", DurationMin: 1, Level: "P2", ChannelIDs: []string{}, IsSandboxed: true, Enabled: true, Version: 1, CreatedBy: "test"}); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if _, err := alertRepo.CreateEventWithAggregation(ctx, &model.AlertEvent{ID: eventID, RuleID: ruleID, Level: "P2", ResourceType: "svc", ResourceID: "res", CurrentValue: "2", ThresholdValue: "1", Status: "triggered", StartedAt: time.Now().UTC()}, 0, 0); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := channelRepo.Create(ctx, &model.NotificationChannel{ID: channelID, Name: "notify-channel", ChannelType: "webhook", Config: map[string]any{"webhook_url": "http://example.invalid"}, Priority: 1, Enabled: true}); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := healingRepo.CreateHealing(ctx, &service.HealingLog{ID: healingID, AlertID: eventID, ActionType: "throttle", Config: map[string]any{"endpoint": "http://example.invalid"}, Status: "pending", DryRun: true, StartedAt: time.Now().UTC()}); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := healingRepo.UpdateHealingStatus(ctx, healingID, "succeeded", map[string]any{"ok": true}, ""); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := notificationRepo.CreateLog(ctx, &model.NotificationLog{ID: notificationID, EventID: eventID, ChannelID: channelID, ChannelType: "webhook", Status: "pending"}); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := notificationRepo.MarkSent(ctx, notificationID); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if err := notificationRepo.MarkFailed(ctx, notificationID, 1, "retry failed"); err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
}
|