Files
ai-ops/internal/infra/repository/pg_alert_repository.go
2026-05-12 17:48:22 +08:00

315 lines
11 KiB
Go

package repository
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"time"
"github.com/company/ai-ops/internal/database"
"github.com/company/ai-ops/internal/domain/model"
"github.com/jackc/pgx/v5"
)
// PGAlertRepository 是基于 PostgreSQL 的告警存储实现
type PGAlertRepository struct{}
func NewPGAlertRepository() *PGAlertRepository {
return &PGAlertRepository{}
}
func (r *PGAlertRepository) GetOpenCount(ctx context.Context) (*model.AlertCount, error) {
var count model.AlertCount
err := database.Pool.QueryRow(ctx, `
SELECT
COUNT(*) FILTER (WHERE status != 'resolved') AS open_count,
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P0') AS p0_count,
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P1') AS p1_count,
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P2') AS p2_count,
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P3') AS p3_count
FROM ai_ops_alerts
`).Scan(&count.Open, &count.P0, &count.P1, &count.P2, &count.P3)
if err != nil {
return nil, fmt.Errorf("query alert count: %w", err)
}
return &count, nil
}
func (r *PGAlertRepository) ListRules(ctx context.Context) ([]model.AlertRule, error) {
rows, err := database.Pool.Query(ctx, `
SELECT id, name, metric_source, metric_name, threshold_type, threshold_value,
duration_min, level, channel_ids, healing_action, healing_config,
is_sandboxed, enabled, version, created_by, created_at, updated_at
FROM ai_ops_rules
WHERE enabled = true
ORDER BY created_at DESC
`)
if err != nil {
return nil, fmt.Errorf("query rules: %w", err)
}
defer rows.Close()
rules := make([]model.AlertRule, 0)
for rows.Next() {
var ru model.AlertRule
var channelIDs []string
if err := rows.Scan(
&ru.ID, &ru.Name, &ru.MetricSource, &ru.MetricName, &ru.ThresholdType, &ru.ThresholdValue,
&ru.DurationMin, &ru.Level, &channelIDs, &ru.HealingAction, &ru.HealingConfig,
&ru.IsSandboxed, &ru.Enabled, &ru.Version, &ru.CreatedBy, &ru.CreatedAt, &ru.UpdatedAt,
); err != nil {
return nil, fmt.Errorf("scan rule: %w", err)
}
ru.ChannelIDs = channelIDs
rules = append(rules, ru)
}
return rules, rows.Err()
}
func (r *PGAlertRepository) GetRuleByID(ctx context.Context, id string) (*model.AlertRule, error) {
var ru model.AlertRule
var channelIDs []string
err := database.Pool.QueryRow(ctx, `
SELECT id, name, metric_source, metric_name, threshold_type, threshold_value,
duration_min, level, channel_ids, healing_action, healing_config,
is_sandboxed, enabled, version, created_by, created_at, updated_at
FROM ai_ops_rules WHERE id = $1
`, id).Scan(
&ru.ID, &ru.Name, &ru.MetricSource, &ru.MetricName, &ru.ThresholdType, &ru.ThresholdValue,
&ru.DurationMin, &ru.Level, &channelIDs, &ru.HealingAction, &ru.HealingConfig,
&ru.IsSandboxed, &ru.Enabled, &ru.Version, &ru.CreatedBy, &ru.CreatedAt, &ru.UpdatedAt,
)
if err == pgx.ErrNoRows {
return nil, fmt.Errorf("rule not found")
}
if err != nil {
return nil, fmt.Errorf("query rule: %w", err)
}
ru.ChannelIDs = channelIDs
return &ru, nil
}
func (r *PGAlertRepository) CreateRule(ctx context.Context, rule *model.AlertRule) error {
_, err := database.Pool.Exec(ctx, `
INSERT INTO ai_ops_rules (id, name, metric_source, metric_name, threshold_type, threshold_value,
duration_min, level, channel_ids, healing_action, healing_config,
is_sandboxed, enabled, version, created_by, created_at, updated_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NOW(),NOW())
`, rule.ID, rule.Name, rule.MetricSource, rule.MetricName, rule.ThresholdType, rule.ThresholdValue,
rule.DurationMin, rule.Level, rule.ChannelIDs, rule.HealingAction, rule.HealingConfig,
rule.IsSandboxed, rule.Enabled, rule.Version, rule.CreatedBy)
if err != nil {
return fmt.Errorf("insert rule: %w", err)
}
return nil
}
func (r *PGAlertRepository) UpdateRule(ctx context.Context, rule *model.AlertRule) error {
_, err := database.Pool.Exec(ctx, `
UPDATE ai_ops_rules SET
name=$2, metric_source=$3, metric_name=$4, threshold_type=$5, threshold_value=$6,
duration_min=$7, level=$8, channel_ids=$9, healing_action=$10, healing_config=$11,
is_sandboxed=$12, enabled=$13, version=$14, updated_at=NOW()
WHERE id=$1
`, rule.ID, rule.Name, rule.MetricSource, rule.MetricName, rule.ThresholdType, rule.ThresholdValue,
rule.DurationMin, rule.Level, rule.ChannelIDs, rule.HealingAction, rule.HealingConfig,
rule.IsSandboxed, rule.Enabled, rule.Version)
if err != nil {
return fmt.Errorf("update rule: %w", err)
}
return nil
}
func (r *PGAlertRepository) DeleteRule(ctx context.Context, id string) error {
_, err := database.Pool.Exec(ctx, `DELETE FROM ai_ops_rules WHERE id = $1`, id)
if err != nil {
return fmt.Errorf("delete rule: %w", err)
}
return nil
}
func (r *PGAlertRepository) ListEvents(ctx context.Context, status string, page, pageSize int) ([]model.AlertEvent, int, error) {
where := ""
args := []any{}
if status != "" {
where = "WHERE status = $1"
args = append(args, status)
}
var total int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM ai_ops_alerts %s", where)
if err := database.Pool.QueryRow(ctx, countQuery, args...).Scan(&total); err != nil {
return nil, 0, fmt.Errorf("count events: %w", err)
}
if page < 1 {
page = 1
}
if pageSize < 1 || pageSize > 100 {
pageSize = 20
}
offset := (page - 1) * pageSize
dataQuery := fmt.Sprintf(`
SELECT id, rule_id, level, resource_type, resource_id, current_value, threshold_value,
status, is_aggregated, aggregated_count, parent_alert_id, started_at, resolved_at
FROM ai_ops_alerts %s
ORDER BY started_at DESC
LIMIT $%d OFFSET $%d
`, where, len(args)+1, len(args)+2)
queryArgs := append(args, pageSize, offset)
rows, err := database.Pool.Query(ctx, dataQuery, queryArgs...)
if err != nil {
return nil, 0, fmt.Errorf("query events: %w", err)
}
defer rows.Close()
events := make([]model.AlertEvent, 0)
for rows.Next() {
var e model.AlertEvent
if err := rows.Scan(
&e.ID, &e.RuleID, &e.Level, &e.ResourceType, &e.ResourceID,
&e.CurrentValue, &e.ThresholdValue, &e.Status, &e.IsAggregated, &e.AggregatedCount,
&e.ParentAlertID, &e.StartedAt, &e.ResolvedAt,
); err != nil {
return nil, 0, fmt.Errorf("scan event: %w", err)
}
events = append(events, e)
}
return events, total, rows.Err()
}
func (r *PGAlertRepository) CreateEvent(ctx context.Context, event *model.AlertEvent) error {
_, err := r.CreateEventWithAggregation(ctx, event, 0, 0)
return err
}
func (r *PGAlertRepository) CreateEventWithAggregation(ctx context.Context, event *model.AlertEvent, window time.Duration, threshold int) (*model.AlertEvent, error) {
tx, err := database.Pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("begin create event: %w", err)
}
defer tx.Rollback(ctx)
startedAt := event.StartedAt
if startedAt.IsZero() {
startedAt = time.Now()
}
_, err = tx.Exec(ctx, `
INSERT INTO ai_ops_alerts (id, rule_id, level, resource_type, resource_id,
current_value, threshold_value, status, is_aggregated, aggregated_count, parent_alert_id, started_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
`, event.ID, event.RuleID, event.Level, event.ResourceType, event.ResourceID,
event.CurrentValue, event.ThresholdValue, event.Status, event.IsAggregated,
event.AggregatedCount, event.ParentAlertID, startedAt)
if err != nil {
return nil, fmt.Errorf("insert event: %w", err)
}
event.StartedAt = startedAt
if window <= 0 || threshold <= 0 {
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit event: %w", err)
}
return event, nil
}
var count int
err = tx.QueryRow(ctx, `
SELECT COUNT(*)
FROM ai_ops_alerts
WHERE resource_type = $1
AND resource_id = $2
AND started_at >= $3
AND is_aggregated = false
AND parent_alert_id IS NULL
`, event.ResourceType, event.ResourceID, startedAt.Add(-window)).Scan(&count)
if err != nil {
return nil, fmt.Errorf("count aggregation candidates: %w", err)
}
if count <= threshold {
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit event: %w", err)
}
return event, nil
}
aggregated := &model.AlertEvent{
ID: newUUID(),
RuleID: event.RuleID,
Level: event.Level,
ResourceType: event.ResourceType,
ResourceID: event.ResourceID,
CurrentValue: event.CurrentValue,
ThresholdValue: fmt.Sprintf("cluster_count>%d", threshold),
Status: event.Status,
IsAggregated: true,
AggregatedCount: count,
StartedAt: startedAt,
}
_, err = tx.Exec(ctx, `
INSERT INTO ai_ops_alerts (id, rule_id, level, resource_type, resource_id,
current_value, threshold_value, status, is_aggregated, aggregated_count, started_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,true,$9,$10)
`, aggregated.ID, aggregated.RuleID, aggregated.Level, aggregated.ResourceType, aggregated.ResourceID,
aggregated.CurrentValue, aggregated.ThresholdValue, aggregated.Status, aggregated.AggregatedCount, aggregated.StartedAt)
if err != nil {
return nil, fmt.Errorf("insert aggregated event: %w", err)
}
_, err = tx.Exec(ctx, `
UPDATE ai_ops_alerts
SET parent_alert_id = $1
WHERE resource_type = $2
AND resource_id = $3
AND started_at >= $4
AND is_aggregated = false
AND parent_alert_id IS NULL
`, aggregated.ID, event.ResourceType, event.ResourceID, startedAt.Add(-window))
if err != nil {
return nil, fmt.Errorf("attach aggregated children: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit aggregated event: %w", err)
}
return aggregated, nil
}
func (r *PGAlertRepository) UpdateEventStatus(ctx context.Context, id, status string) error {
resolvedAt := "NULL"
if status == "resolved" {
resolvedAt = "NOW()"
}
_, err := database.Pool.Exec(ctx, fmt.Sprintf(`
UPDATE ai_ops_alerts SET status = $2, resolved_at = %s WHERE id = $1
`, resolvedAt), id, status)
if err != nil {
return fmt.Errorf("update event status: %w", err)
}
return nil
}
func (r *PGAlertRepository) EscalateEvent(ctx context.Context, id, newLevel string) error {
_, err := database.Pool.Exec(ctx, `UPDATE ai_ops_alerts SET level = $2 WHERE id = $1`, id, newLevel)
if err != nil {
return fmt.Errorf("escalate event: %w", err)
}
return nil
}
func newUUID() string {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return fmt.Sprintf("00000000-0000-4000-8000-%012d", time.Now().UnixNano()%1_000_000_000_000)
}
b[6] = (b[6] & 0x0f) | 0x40
b[8] = (b[8] & 0x3f) | 0x80
return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(b[0:4]), hex.EncodeToString(b[4:6]), hex.EncodeToString(b[6:8]), hex.EncodeToString(b[8:10]), hex.EncodeToString(b[10:16]))
}