Files
ai-ops/internal/infra/repository/pg_metric_repository.go
2026-05-12 17:48:22 +08:00

96 lines
2.5 KiB
Go

package repository
import (
"context"
"fmt"
"github.com/company/ai-ops/internal/database"
"github.com/company/ai-ops/internal/domain/model"
"github.com/jackc/pgx/v5"
)
// PGMetricRepository 是基于 PostgreSQL 的指标存储实现
type PGMetricRepository struct{}
func NewPGMetricRepository() *PGMetricRepository {
return &PGMetricRepository{}
}
func (r *PGMetricRepository) GetRealtime(ctx context.Context) (*model.RealtimeMetrics, error) {
// 从 ai_ops_metrics 表查询各指标的最新值
queries := map[string]*float64{
"qps": new(float64),
"avg_latency": new(float64),
"p99_latency": new(float64),
"error_rate": new(float64),
}
for name, ptr := range queries {
var value float64
err := database.Pool.QueryRow(ctx, `
SELECT value FROM ai_ops_metrics
WHERE metric_name = $1
ORDER BY recorded_at DESC
LIMIT 1
`, name).Scan(&value)
if err != nil && err != pgx.ErrNoRows {
return nil, fmt.Errorf("query %s: %w", name, err)
}
*ptr = value
}
return &model.RealtimeMetrics{
QPS: *queries["qps"],
AvgLatency: *queries["avg_latency"],
P99Latency: *queries["p99_latency"],
ErrorRate: *queries["error_rate"],
}, nil
}
func (r *PGMetricRepository) Query(ctx context.Context, req model.MetricQueryRequest) ([]model.MetricPoint, error) {
rows, err := database.Pool.Query(ctx, `
SELECT metric_name, labels, value, recorded_at
FROM ai_ops_metrics
WHERE metric_name = $1
AND recorded_at >= $2
AND recorded_at <= $3
ORDER BY recorded_at DESC
`, req.Name, req.StartTime, req.EndTime)
if err != nil {
return nil, fmt.Errorf("query metrics: %w", err)
}
defer rows.Close()
var points []model.MetricPoint
for rows.Next() {
var p model.MetricPoint
var labels map[string]string
if err := rows.Scan(&p.Name, &labels, &p.Value, &p.Timestamp); err != nil {
return nil, fmt.Errorf("scan metric: %w", err)
}
p.Source = req.Source
p.Tags = labels
points = append(points, p)
}
return points, rows.Err()
}
func (r *PGMetricRepository) GetLatest(ctx context.Context, source, name string) (*model.MetricPoint, error) {
var p model.MetricPoint
var labels map[string]string
err := database.Pool.QueryRow(ctx, `
SELECT metric_name, labels, value, recorded_at
FROM ai_ops_metrics
WHERE metric_name = $1
ORDER BY recorded_at DESC
LIMIT 1
`, name).Scan(&p.Name, &labels, &p.Value, &p.Timestamp)
if err != nil {
return nil, fmt.Errorf("query latest metric: %w", err)
}
p.Source = source
p.Tags = labels
return &p, nil
}