Files
llm-intelligence/scripts/discover_intraday_news_candidates.go

450 lines
14 KiB
Go
Raw Permalink Normal View History

//go:build llm_script && !scripts_pkg
package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"
_ "github.com/lib/pq"
)
type intradayNewsCandidate struct {
CandidateDate string
EventType string
ProviderName string
ModelName string
ProviderCountry string
Title string
Summary string
CandidateURLs []string
DiscoverySource string
DiscoveryQuery string
DiscoveryEvidence map[string]any
NormalizedKey string
Status string
VerificationConfidence string
VerificationNotes string
}
type intradayDiscoveryConfig struct {
Date string
DryRun bool
Search intradayProviderConfig
LLM intradayProviderConfig
DatabaseURL string
Timeout time.Duration
ProviderLimit int
}
type intradayDiscoverySummary struct {
CandidateTotal int `json:"candidate_total"`
ProviderHitCount int `json:"provider_hit_count"`
EventTypeCounts map[string]int `json:"event_type_counts"`
DiscoverySourceSet []string `json:"discovery_source_set"`
DryRun bool `json:"dry_run"`
}
var intradayDiscoveryLogger *slog.Logger
func init() {
intradayDiscoveryLogger = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
}
func main() {
loadIntradayEnv()
cfg := loadIntradayDiscoveryConfig()
if err := runIntradayCandidateDiscovery(cfg); err != nil {
fmt.Fprintf(os.Stderr, "discover_intraday_news_candidates: %v\n", err)
os.Exit(1)
}
}
func loadIntradayDiscoveryConfig() intradayDiscoveryConfig {
var cfg intradayDiscoveryConfig
flag.StringVar(&cfg.Date, "date", intradayDateValue(), "候选发现日期,格式 YYYY-MM-DD")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "仅输出摘要,不写数据库")
flag.IntVar(&cfg.ProviderLimit, "provider-limit", 10, "最大 provider 数")
flag.Parse()
cfg.DatabaseURL = intradayDefaultDSN()
cfg.Timeout = discoveryTimeoutFromEnv()
cfg.Search = intradayProviderConfig{
Mode: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_SEARCH_PROVIDER")),
Command: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_SEARCH_COMMAND")),
URL: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_SEARCH_URL")),
Fixture: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_SEARCH_FIXTURE")),
Timeout: cfg.Timeout,
}
cfg.LLM = intradayProviderConfig{
Mode: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_LLM_PROVIDER")),
Command: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_LLM_COMMAND")),
URL: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_LLM_URL")),
Fixture: strings.TrimSpace(os.Getenv("INTRADAY_DISCOVERY_LLM_FIXTURE")),
Timeout: cfg.Timeout,
}
return cfg
}
func runIntradayCandidateDiscovery(cfg intradayDiscoveryConfig) error {
if strings.TrimSpace(cfg.Date) == "" {
return fmt.Errorf("date 未设置")
}
if err := validateIntradayProviderConfig("search", cfg.Search); err != nil {
return err
}
if err := validateIntradayProviderConfig("llm", cfg.LLM); err != nil {
return err
}
queries := buildIntradayQueries(cfg.Date, cfg.ProviderLimit)
searchRecords, err := loadIntradaySearchRecords(cfg.Search, cfg.Date, queries)
if err != nil {
return err
}
llmRecords, err := loadIntradayLLMRecords(cfg.LLM, cfg.Date, searchRecords)
if err != nil {
return err
}
candidates := normalizeIntradayCandidates(cfg.Date, searchRecords, llmRecords)
summary := summarizeIntradayCandidates(candidates, cfg.DryRun)
if cfg.DryRun {
return printIntradayDiscoverySummary(summary)
}
db, err := sql.Open("postgres", cfg.DatabaseURL)
if err != nil {
return fmt.Errorf("open db: %w", err)
}
defer db.Close()
if err := upsertIntradayCandidates(context.Background(), db, candidates); err != nil {
return err
}
return printIntradayDiscoverySummary(summary)
}
func validateIntradayProviderConfig(name string, cfg intradayProviderConfig) error {
if strings.TrimSpace(cfg.Mode) == "" {
return fmt.Errorf("%s provider 未设置", name)
}
switch cfg.Mode {
case "fixture":
if strings.TrimSpace(cfg.Fixture) == "" {
return fmt.Errorf("%s provider fixture 未设置", name)
}
case "command_json":
if strings.TrimSpace(cfg.Command) == "" {
return fmt.Errorf("%s provider command 未设置", name)
}
case "http_json":
if strings.TrimSpace(cfg.URL) == "" {
return fmt.Errorf("%s provider url 未设置", name)
}
default:
return fmt.Errorf("%s provider mode 不支持: %s", name, cfg.Mode)
}
return nil
}
func buildIntradayQueries(date string, providerLimit int) []string {
queries := []string{
"site:platform.deepseek.com DeepSeek pricing",
"site:api-docs.deepseek.com DeepSeek release news",
"site:docs.anthropic.com Claude Sonnet 4 announcement",
"site:openrouter.ai OpenRouter models",
}
if providerLimit > 0 && providerLimit < len(queries) {
return queries[:providerLimit]
}
return queries
}
func normalizeIntradayCandidates(date string, searchRecords []intradaySearchRecord, llmRecords []intradayLLMRecord) []intradayNewsCandidate {
searchIndex := indexSearchRecordsByURL(searchRecords)
candidatesByKey := map[string]intradayNewsCandidate{}
for _, record := range llmRecords {
candidate := candidateFromLLMRecord(date, record, searchIndex)
if len(candidate.CandidateURLs) == 0 {
continue
}
if candidate.ProviderName == "" {
candidate.ProviderName = inferProviderFromTitle(candidate.Title)
}
candidate.EventType = normalizeIntradayEventType(candidate.EventType)
candidate.NormalizedKey = buildIntradayNormalizedKey(candidate)
mergeIntradayCandidate(candidatesByKey, candidate)
}
result := make([]intradayNewsCandidate, 0, len(candidatesByKey))
for _, candidate := range candidatesByKey {
result = append(result, candidate)
}
sort.Slice(result, func(i, j int) bool {
if result[i].ProviderName != result[j].ProviderName {
return result[i].ProviderName < result[j].ProviderName
}
if result[i].EventType != result[j].EventType {
return result[i].EventType < result[j].EventType
}
return result[i].NormalizedKey < result[j].NormalizedKey
})
return result
}
func candidateFromLLMRecord(date string, record intradayLLMRecord, searchIndex map[string]intradaySearchRecord) intradayNewsCandidate {
candidate := intradayNewsCandidate{
CandidateDate: date,
EventType: record.EventType,
ProviderName: strings.TrimSpace(record.ProviderName),
ModelName: strings.TrimSpace(record.ModelName),
ProviderCountry: strings.TrimSpace(record.ProviderCountry),
Title: strings.TrimSpace(record.Title),
Summary: strings.TrimSpace(record.Summary),
CandidateURLs: dedupeStrings(record.CandidateURLs),
DiscoverySource: "llm_answer",
DiscoveryEvidence: map[string]any{"llm_record": record},
Status: "candidate",
VerificationConfidence: "candidate",
}
matchedSearch := false
filteredURLs := make([]string, 0, len(candidate.CandidateURLs))
for _, url := range candidate.CandidateURLs {
searchRecord, ok := searchIndex[url]
if !ok {
continue
}
if !searchRecordMatchesDate(searchRecord, date) {
continue
}
matchedSearch = true
filteredURLs = append(filteredURLs, url)
candidate.DiscoverySource = "web_search+llm"
candidate.DiscoveryQuery = searchRecord.Title
candidate.DiscoveryEvidence["search_record"] = searchRecord
if candidate.ProviderName == "" {
candidate.ProviderName = strings.TrimSpace(searchRecord.Provider)
}
if candidate.Title == "" {
candidate.Title = strings.TrimSpace(searchRecord.Title)
}
if candidate.Summary == "" {
candidate.Summary = strings.TrimSpace(searchRecord.Summary)
}
}
if !matchedSearch {
candidate.CandidateURLs = nil
return candidate
}
candidate.CandidateURLs = dedupeStrings(filteredURLs)
return candidate
}
func indexSearchRecordsByURL(records []intradaySearchRecord) map[string]intradaySearchRecord {
indexed := make(map[string]intradaySearchRecord, len(records))
for _, record := range records {
url := strings.TrimSpace(record.URL)
if url == "" {
continue
}
indexed[url] = record
}
return indexed
}
func mergeIntradayCandidate(target map[string]intradayNewsCandidate, candidate intradayNewsCandidate) {
if candidate.NormalizedKey == "" {
return
}
existing, ok := target[candidate.NormalizedKey]
if !ok {
target[candidate.NormalizedKey] = candidate
return
}
merged := existing
merged.CandidateURLs = dedupeStrings(append(existing.CandidateURLs, candidate.CandidateURLs...))
if strings.TrimSpace(merged.Summary) == "" {
merged.Summary = candidate.Summary
}
if strings.TrimSpace(merged.ProviderCountry) == "" {
merged.ProviderCountry = candidate.ProviderCountry
}
if merged.DiscoverySource != candidate.DiscoverySource && candidate.DiscoverySource != "" {
merged.DiscoverySource = "web_search+llm"
}
if merged.DiscoveryEvidence == nil {
merged.DiscoveryEvidence = map[string]any{}
}
if llmRecord, ok := candidate.DiscoveryEvidence["llm_record"]; ok {
merged.DiscoveryEvidence["llm_record"] = llmRecord
}
if searchRecord, ok := candidate.DiscoveryEvidence["search_record"]; ok {
merged.DiscoveryEvidence["search_record"] = searchRecord
}
target[candidate.NormalizedKey] = merged
}
func buildIntradayNormalizedKey(candidate intradayNewsCandidate) string {
provider := normalizeWord(candidate.ProviderName)
model := normalizeWord(candidate.ModelName)
if model == "" {
model = normalizeWord(candidate.Title)
}
return strings.Join([]string{
candidate.CandidateDate,
normalizeWord(candidate.EventType),
provider,
model,
}, "|")
}
func searchRecordMatchesDate(record intradaySearchRecord, date string) bool {
published := strings.TrimSpace(record.PublishedAt)
if published == "" {
return false
}
if ts, ok := parseSearchPublishedAt(published); ok {
return ts == date
}
return strings.Contains(published, date)
}
func parseSearchPublishedAt(value string) (string, bool) {
for _, layout := range []string{time.RFC3339, "2006-01-02", "Mon, 02 Jan 2006 15:04:05 MST", "Mon, 2 Jan 2006 15:04:05 MST"} {
if ts, err := time.Parse(layout, value); err == nil {
return ts.Format("2006-01-02"), true
}
}
localized := strings.NewReplacer(
"周一", "Mon", "周二", "Tue", "周三", "Wed", "周四", "Thu", "周五", "Fri", "周六", "Sat", "周日", "Sun",
"1月", "Jan", "2月", "Feb", "3月", "Mar", "4月", "Apr", "5月", "May", "6月", "Jun",
"7月", "Jul", "8月", "Aug", "9月", "Sep", "10月", "Oct", "11月", "Nov", "12月", "Dec",
).Replace(value)
for _, layout := range []string{"Mon, 2 Jan 2006 15:04:05 MST", "Mon, 02 Jan 2006 15:04:05 MST"} {
if ts, err := time.Parse(layout, localized); err == nil {
return ts.Format("2006-01-02"), true
}
}
return "", false
}
func summarizeIntradayCandidates(candidates []intradayNewsCandidate, dryRun bool) intradayDiscoverySummary {
eventTypeCounts := make(map[string]int)
providerSet := map[string]struct{}{}
sourceSet := map[string]struct{}{}
for _, candidate := range candidates {
eventTypeCounts[candidate.EventType]++
if candidate.ProviderName != "" {
providerSet[candidate.ProviderName] = struct{}{}
}
if candidate.DiscoverySource != "" {
sourceSet[candidate.DiscoverySource] = struct{}{}
}
}
sources := make([]string, 0, len(sourceSet))
for source := range sourceSet {
sources = append(sources, source)
}
sort.Strings(sources)
return intradayDiscoverySummary{
CandidateTotal: len(candidates),
ProviderHitCount: len(providerSet),
EventTypeCounts: eventTypeCounts,
DiscoverySourceSet: sources,
DryRun: dryRun,
}
}
func printIntradayDiscoverySummary(summary intradayDiscoverySummary) error {
payload, err := json.Marshal(summary)
if err != nil {
return err
}
fmt.Println(string(payload))
return nil
}
func upsertIntradayCandidates(ctx context.Context, db *sql.DB, candidates []intradayNewsCandidate) error {
if db == nil {
return fmt.Errorf("db is nil")
}
for _, candidate := range candidates {
urls, err := json.Marshal(candidate.CandidateURLs)
if err != nil {
return fmt.Errorf("marshal candidate urls: %w", err)
}
evidence, err := json.Marshal(candidate.DiscoveryEvidence)
if err != nil {
return fmt.Errorf("marshal discovery evidence: %w", err)
}
_, err = db.ExecContext(ctx, `
INSERT INTO intraday_news_candidate (
candidate_date, event_type, provider_name, model_name, provider_country,
title, summary, candidate_urls, discovery_source, discovery_query,
discovery_evidence, normalized_key, status, verification_confidence, verification_notes
) VALUES (
$1::date, $2, $3, NULLIF($4, ''), NULLIF($5, ''),
$6, NULLIF($7, ''), $8::jsonb, $9, NULLIF($10, ''),
$11::jsonb, $12, $13, $14, NULLIF($15, '')
)
ON CONFLICT (normalized_key) DO UPDATE SET
title = EXCLUDED.title,
summary = COALESCE(NULLIF(EXCLUDED.summary, ''), intraday_news_candidate.summary),
candidate_urls = EXCLUDED.candidate_urls,
discovery_source = EXCLUDED.discovery_source,
discovery_query = COALESCE(NULLIF(EXCLUDED.discovery_query, ''), intraday_news_candidate.discovery_query),
discovery_evidence = EXCLUDED.discovery_evidence,
provider_country = COALESCE(NULLIF(EXCLUDED.provider_country, ''), intraday_news_candidate.provider_country),
updated_at = CURRENT_TIMESTAMP`,
candidate.CandidateDate,
candidate.EventType,
candidate.ProviderName,
candidate.ModelName,
candidate.ProviderCountry,
candidate.Title,
candidate.Summary,
string(urls),
candidate.DiscoverySource,
candidate.DiscoveryQuery,
string(evidence),
candidate.NormalizedKey,
candidate.Status,
candidate.VerificationConfidence,
candidate.VerificationNotes,
)
if err != nil {
return fmt.Errorf("upsert intraday candidate %s: %w", candidate.NormalizedKey, err)
}
}
return nil
}
func inferProviderFromTitle(title string) string {
lower := strings.ToLower(title)
for _, pair := range []struct{ match, provider string }{
{"openai", "OpenAI"},
{"anthropic", "Anthropic"},
{"gemini", "Google"},
{"deepseek", "DeepSeek"},
{"qwen", "Qwen"},
{"dashscope", "DashScope"},
{"xai", "xAI"},
{"minimax", "MiniMax"},
{"智谱", "智谱"},
{"百度", "百度"},
{"腾讯", "腾讯"},
} {
if strings.Contains(lower, pair.match) {
return pair.provider
}
}
return ""
}