Files
ai-ops/internal/infra/repository/pg_log_repository.go
2026-05-12 17:48:22 +08:00

113 lines
2.9 KiB
Go

package repository
import (
"context"
"fmt"
"strings"
"github.com/company/ai-ops/internal/database"
"github.com/company/ai-ops/internal/domain/model"
)
// PGLogRepository 是基于 PostgreSQL 的日志存储实现
type PGLogRepository struct{}
func NewPGLogRepository() *PGLogRepository {
return &PGLogRepository{}
}
func (r *PGLogRepository) Query(ctx context.Context, filter model.LogQueryFilter) ([]model.RequestLog, int, error) {
// 构建查询条件(参数化查询)
var conditions []string
var args []any
argIdx := 1
if filter.StartTime != nil {
conditions = append(conditions, fmt.Sprintf("timestamp >= $%d", argIdx))
args = append(args, *filter.StartTime)
argIdx++
}
if filter.EndTime != nil {
conditions = append(conditions, fmt.Sprintf("timestamp <= $%d", argIdx))
args = append(args, *filter.EndTime)
argIdx++
}
if filter.Service != "" {
conditions = append(conditions, fmt.Sprintf("service = $%d", argIdx))
args = append(args, filter.Service)
argIdx++
}
if filter.Path != "" {
conditions = append(conditions, fmt.Sprintf("path = $%d", argIdx))
args = append(args, filter.Path)
argIdx++
}
if filter.StatusCode != nil {
conditions = append(conditions, fmt.Sprintf("status_code = $%d", argIdx))
args = append(args, *filter.StatusCode)
argIdx++
}
if filter.UserID != "" {
conditions = append(conditions, fmt.Sprintf("user_id = $%d", argIdx))
args = append(args, filter.UserID)
argIdx++
}
if filter.SupplierID != "" {
conditions = append(conditions, fmt.Sprintf("supplier_id = $%d", argIdx))
args = append(args, filter.SupplierID)
argIdx++
}
whereClause := ""
if len(conditions) > 0 {
whereClause = "WHERE " + strings.Join(conditions, " AND ")
}
// 查询总数
var total int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM ai_ops_request_logs %s", whereClause)
if err := database.Pool.QueryRow(ctx, countQuery, args...).Scan(&total); err != nil {
return nil, 0, fmt.Errorf("count logs: %w", err)
}
// 查询分页数据
page := filter.Page
if page < 1 {
page = 1
}
pageSize := filter.PageSize
if pageSize < 1 || pageSize > 100 {
pageSize = 20
}
offset := (page - 1) * pageSize
queryArgs := append(args, pageSize, offset)
dataQuery := fmt.Sprintf(`
SELECT id, timestamp, service, path, status_code, latency_ms, user_id, supplier_id, method, error_code
FROM ai_ops_request_logs
%s
ORDER BY timestamp DESC
LIMIT $%d OFFSET $%d
`, whereClause, argIdx, argIdx+1)
rows, err := database.Pool.Query(ctx, dataQuery, queryArgs...)
if err != nil {
return nil, 0, fmt.Errorf("query logs: %w", err)
}
defer rows.Close()
var logs []model.RequestLog
for rows.Next() {
var l model.RequestLog
if err := rows.Scan(
&l.ID, &l.Timestamp, &l.Service, &l.Path, &l.StatusCode,
&l.LatencyMs, &l.UserID, &l.SupplierID, &l.Method, &l.ErrorCode,
); err != nil {
return nil, 0, fmt.Errorf("scan log: %w", err)
}
logs = append(logs, l)
}
return logs, total, rows.Err()
}