315 lines
11 KiB
Go
315 lines
11 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/company/ai-ops/internal/database"
|
|
"github.com/company/ai-ops/internal/domain/model"
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// PGAlertRepository 是基于 PostgreSQL 的告警存储实现
|
|
type PGAlertRepository struct{}
|
|
|
|
func NewPGAlertRepository() *PGAlertRepository {
|
|
return &PGAlertRepository{}
|
|
}
|
|
|
|
func (r *PGAlertRepository) GetOpenCount(ctx context.Context) (*model.AlertCount, error) {
|
|
var count model.AlertCount
|
|
err := database.Pool.QueryRow(ctx, `
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE status != 'resolved') AS open_count,
|
|
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P0') AS p0_count,
|
|
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P1') AS p1_count,
|
|
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P2') AS p2_count,
|
|
COUNT(*) FILTER (WHERE status != 'resolved' AND level = 'P3') AS p3_count
|
|
FROM ai_ops_alerts
|
|
`).Scan(&count.Open, &count.P0, &count.P1, &count.P2, &count.P3)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query alert count: %w", err)
|
|
}
|
|
return &count, nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) ListRules(ctx context.Context) ([]model.AlertRule, error) {
|
|
rows, err := database.Pool.Query(ctx, `
|
|
SELECT id, name, metric_source, metric_name, threshold_type, threshold_value,
|
|
duration_min, level, channel_ids, healing_action, healing_config,
|
|
is_sandboxed, enabled, version, created_by, created_at, updated_at
|
|
FROM ai_ops_rules
|
|
WHERE enabled = true
|
|
ORDER BY created_at DESC
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query rules: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
rules := make([]model.AlertRule, 0)
|
|
for rows.Next() {
|
|
var ru model.AlertRule
|
|
var channelIDs []string
|
|
if err := rows.Scan(
|
|
&ru.ID, &ru.Name, &ru.MetricSource, &ru.MetricName, &ru.ThresholdType, &ru.ThresholdValue,
|
|
&ru.DurationMin, &ru.Level, &channelIDs, &ru.HealingAction, &ru.HealingConfig,
|
|
&ru.IsSandboxed, &ru.Enabled, &ru.Version, &ru.CreatedBy, &ru.CreatedAt, &ru.UpdatedAt,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan rule: %w", err)
|
|
}
|
|
ru.ChannelIDs = channelIDs
|
|
rules = append(rules, ru)
|
|
}
|
|
return rules, rows.Err()
|
|
}
|
|
|
|
func (r *PGAlertRepository) GetRuleByID(ctx context.Context, id string) (*model.AlertRule, error) {
|
|
var ru model.AlertRule
|
|
var channelIDs []string
|
|
err := database.Pool.QueryRow(ctx, `
|
|
SELECT id, name, metric_source, metric_name, threshold_type, threshold_value,
|
|
duration_min, level, channel_ids, healing_action, healing_config,
|
|
is_sandboxed, enabled, version, created_by, created_at, updated_at
|
|
FROM ai_ops_rules WHERE id = $1
|
|
`, id).Scan(
|
|
&ru.ID, &ru.Name, &ru.MetricSource, &ru.MetricName, &ru.ThresholdType, &ru.ThresholdValue,
|
|
&ru.DurationMin, &ru.Level, &channelIDs, &ru.HealingAction, &ru.HealingConfig,
|
|
&ru.IsSandboxed, &ru.Enabled, &ru.Version, &ru.CreatedBy, &ru.CreatedAt, &ru.UpdatedAt,
|
|
)
|
|
if err == pgx.ErrNoRows {
|
|
return nil, fmt.Errorf("rule not found")
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query rule: %w", err)
|
|
}
|
|
ru.ChannelIDs = channelIDs
|
|
return &ru, nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) CreateRule(ctx context.Context, rule *model.AlertRule) error {
|
|
_, err := database.Pool.Exec(ctx, `
|
|
INSERT INTO ai_ops_rules (id, name, metric_source, metric_name, threshold_type, threshold_value,
|
|
duration_min, level, channel_ids, healing_action, healing_config,
|
|
is_sandboxed, enabled, version, created_by, created_at, updated_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,NOW(),NOW())
|
|
`, rule.ID, rule.Name, rule.MetricSource, rule.MetricName, rule.ThresholdType, rule.ThresholdValue,
|
|
rule.DurationMin, rule.Level, rule.ChannelIDs, rule.HealingAction, rule.HealingConfig,
|
|
rule.IsSandboxed, rule.Enabled, rule.Version, rule.CreatedBy)
|
|
if err != nil {
|
|
return fmt.Errorf("insert rule: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) UpdateRule(ctx context.Context, rule *model.AlertRule) error {
|
|
_, err := database.Pool.Exec(ctx, `
|
|
UPDATE ai_ops_rules SET
|
|
name=$2, metric_source=$3, metric_name=$4, threshold_type=$5, threshold_value=$6,
|
|
duration_min=$7, level=$8, channel_ids=$9, healing_action=$10, healing_config=$11,
|
|
is_sandboxed=$12, enabled=$13, version=$14, updated_at=NOW()
|
|
WHERE id=$1
|
|
`, rule.ID, rule.Name, rule.MetricSource, rule.MetricName, rule.ThresholdType, rule.ThresholdValue,
|
|
rule.DurationMin, rule.Level, rule.ChannelIDs, rule.HealingAction, rule.HealingConfig,
|
|
rule.IsSandboxed, rule.Enabled, rule.Version)
|
|
if err != nil {
|
|
return fmt.Errorf("update rule: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) DeleteRule(ctx context.Context, id string) error {
|
|
_, err := database.Pool.Exec(ctx, `DELETE FROM ai_ops_rules WHERE id = $1`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("delete rule: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) ListEvents(ctx context.Context, status string, page, pageSize int) ([]model.AlertEvent, int, error) {
|
|
where := ""
|
|
args := []any{}
|
|
if status != "" {
|
|
where = "WHERE status = $1"
|
|
args = append(args, status)
|
|
}
|
|
|
|
var total int
|
|
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM ai_ops_alerts %s", where)
|
|
if err := database.Pool.QueryRow(ctx, countQuery, args...).Scan(&total); err != nil {
|
|
return nil, 0, fmt.Errorf("count events: %w", err)
|
|
}
|
|
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if pageSize < 1 || pageSize > 100 {
|
|
pageSize = 20
|
|
}
|
|
offset := (page - 1) * pageSize
|
|
|
|
dataQuery := fmt.Sprintf(`
|
|
SELECT id, rule_id, level, resource_type, resource_id, current_value, threshold_value,
|
|
status, is_aggregated, aggregated_count, parent_alert_id, started_at, resolved_at
|
|
FROM ai_ops_alerts %s
|
|
ORDER BY started_at DESC
|
|
LIMIT $%d OFFSET $%d
|
|
`, where, len(args)+1, len(args)+2)
|
|
queryArgs := append(args, pageSize, offset)
|
|
|
|
rows, err := database.Pool.Query(ctx, dataQuery, queryArgs...)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("query events: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
events := make([]model.AlertEvent, 0)
|
|
for rows.Next() {
|
|
var e model.AlertEvent
|
|
if err := rows.Scan(
|
|
&e.ID, &e.RuleID, &e.Level, &e.ResourceType, &e.ResourceID,
|
|
&e.CurrentValue, &e.ThresholdValue, &e.Status, &e.IsAggregated, &e.AggregatedCount,
|
|
&e.ParentAlertID, &e.StartedAt, &e.ResolvedAt,
|
|
); err != nil {
|
|
return nil, 0, fmt.Errorf("scan event: %w", err)
|
|
}
|
|
events = append(events, e)
|
|
}
|
|
return events, total, rows.Err()
|
|
}
|
|
|
|
func (r *PGAlertRepository) CreateEvent(ctx context.Context, event *model.AlertEvent) error {
|
|
_, err := r.CreateEventWithAggregation(ctx, event, 0, 0)
|
|
return err
|
|
}
|
|
|
|
func (r *PGAlertRepository) CreateEventWithAggregation(ctx context.Context, event *model.AlertEvent, window time.Duration, threshold int) (*model.AlertEvent, error) {
|
|
tx, err := database.Pool.Begin(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("begin create event: %w", err)
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
startedAt := event.StartedAt
|
|
if startedAt.IsZero() {
|
|
startedAt = time.Now()
|
|
}
|
|
|
|
_, err = tx.Exec(ctx, `
|
|
INSERT INTO ai_ops_alerts (id, rule_id, level, resource_type, resource_id,
|
|
current_value, threshold_value, status, is_aggregated, aggregated_count, parent_alert_id, started_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
|
|
`, event.ID, event.RuleID, event.Level, event.ResourceType, event.ResourceID,
|
|
event.CurrentValue, event.ThresholdValue, event.Status, event.IsAggregated,
|
|
event.AggregatedCount, event.ParentAlertID, startedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("insert event: %w", err)
|
|
}
|
|
event.StartedAt = startedAt
|
|
|
|
if window <= 0 || threshold <= 0 {
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, fmt.Errorf("commit event: %w", err)
|
|
}
|
|
return event, nil
|
|
}
|
|
|
|
var count int
|
|
err = tx.QueryRow(ctx, `
|
|
SELECT COUNT(*)
|
|
FROM ai_ops_alerts
|
|
WHERE resource_type = $1
|
|
AND resource_id = $2
|
|
AND started_at >= $3
|
|
AND is_aggregated = false
|
|
AND parent_alert_id IS NULL
|
|
`, event.ResourceType, event.ResourceID, startedAt.Add(-window)).Scan(&count)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("count aggregation candidates: %w", err)
|
|
}
|
|
|
|
if count <= threshold {
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, fmt.Errorf("commit event: %w", err)
|
|
}
|
|
return event, nil
|
|
}
|
|
|
|
aggregated := &model.AlertEvent{
|
|
ID: newUUID(),
|
|
RuleID: event.RuleID,
|
|
Level: event.Level,
|
|
ResourceType: event.ResourceType,
|
|
ResourceID: event.ResourceID,
|
|
CurrentValue: event.CurrentValue,
|
|
ThresholdValue: fmt.Sprintf("cluster_count>%d", threshold),
|
|
Status: event.Status,
|
|
IsAggregated: true,
|
|
AggregatedCount: count,
|
|
StartedAt: startedAt,
|
|
}
|
|
|
|
_, err = tx.Exec(ctx, `
|
|
INSERT INTO ai_ops_alerts (id, rule_id, level, resource_type, resource_id,
|
|
current_value, threshold_value, status, is_aggregated, aggregated_count, started_at)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,true,$9,$10)
|
|
`, aggregated.ID, aggregated.RuleID, aggregated.Level, aggregated.ResourceType, aggregated.ResourceID,
|
|
aggregated.CurrentValue, aggregated.ThresholdValue, aggregated.Status, aggregated.AggregatedCount, aggregated.StartedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("insert aggregated event: %w", err)
|
|
}
|
|
|
|
_, err = tx.Exec(ctx, `
|
|
UPDATE ai_ops_alerts
|
|
SET parent_alert_id = $1
|
|
WHERE resource_type = $2
|
|
AND resource_id = $3
|
|
AND started_at >= $4
|
|
AND is_aggregated = false
|
|
AND parent_alert_id IS NULL
|
|
`, aggregated.ID, event.ResourceType, event.ResourceID, startedAt.Add(-window))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("attach aggregated children: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, fmt.Errorf("commit aggregated event: %w", err)
|
|
}
|
|
return aggregated, nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) UpdateEventStatus(ctx context.Context, id, status string) error {
|
|
resolvedAt := "NULL"
|
|
if status == "resolved" {
|
|
resolvedAt = "NOW()"
|
|
}
|
|
_, err := database.Pool.Exec(ctx, fmt.Sprintf(`
|
|
UPDATE ai_ops_alerts SET status = $2, resolved_at = %s WHERE id = $1
|
|
`, resolvedAt), id, status)
|
|
if err != nil {
|
|
return fmt.Errorf("update event status: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *PGAlertRepository) EscalateEvent(ctx context.Context, id, newLevel string) error {
|
|
_, err := database.Pool.Exec(ctx, `UPDATE ai_ops_alerts SET level = $2 WHERE id = $1`, id, newLevel)
|
|
if err != nil {
|
|
return fmt.Errorf("escalate event: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newUUID() string {
|
|
b := make([]byte, 16)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return fmt.Sprintf("00000000-0000-4000-8000-%012d", time.Now().UnixNano()%1_000_000_000_000)
|
|
}
|
|
b[6] = (b[6] & 0x0f) | 0x40
|
|
b[8] = (b[8] & 0x3f) | 0x80
|
|
return fmt.Sprintf("%s-%s-%s-%s-%s", hex.EncodeToString(b[0:4]), hex.EncodeToString(b[4:6]), hex.EncodeToString(b[6:8]), hex.EncodeToString(b[8:10]), hex.EncodeToString(b[10:16]))
|
|
}
|