Files
lijiaoqiao/supply-api/sql/postgresql/partition_strategy_v1.sql

264 lines
11 KiB
PL/PgSQL
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
-- Partition Strategy Schema v1.0
-- 按月分区的大表分区策略
-- ==================== 1. audit_events 分区 (按月分区保留12个月) ====================
-- 创建父表
CREATE TABLE IF NOT EXISTS audit_events (
id BIGSERIAL,
event_id VARCHAR(100) NOT NULL,
event_name VARCHAR(100) NOT NULL,
event_category VARCHAR(50) NOT NULL DEFAULT '',
event_sub_category VARCHAR(50) NOT NULL DEFAULT '',
timestamp TIMESTAMPTZ NOT NULL,
timestamp_ms BIGINT NOT NULL DEFAULT 0,
request_id VARCHAR(100) NOT NULL DEFAULT '',
trace_id VARCHAR(64) NOT NULL DEFAULT '',
span_id VARCHAR(64) NOT NULL DEFAULT '',
idempotency_key VARCHAR(128) NOT NULL DEFAULT '',
operator_id BIGINT NOT NULL DEFAULT 0,
operator_type VARCHAR(32) NOT NULL DEFAULT '',
operator_role VARCHAR(64) NOT NULL DEFAULT '',
tenant_id BIGINT NOT NULL DEFAULT 0,
tenant_type VARCHAR(32) NOT NULL DEFAULT '',
object_type VARCHAR(100) NOT NULL DEFAULT '',
object_id BIGINT NOT NULL DEFAULT 0,
action VARCHAR(100) NOT NULL,
action_detail TEXT NOT NULL DEFAULT '',
credential_type VARCHAR(64) NOT NULL DEFAULT '',
credential_id VARCHAR(255) NOT NULL DEFAULT '',
credential_fingerprint VARCHAR(255) NOT NULL DEFAULT '',
source_type VARCHAR(32) NOT NULL DEFAULT '',
source_ip VARCHAR(50) NOT NULL DEFAULT '',
source_region VARCHAR(100) NOT NULL DEFAULT '',
user_agent TEXT NOT NULL DEFAULT '',
target_type VARCHAR(32) NOT NULL DEFAULT '',
target_endpoint TEXT NOT NULL DEFAULT '',
target_direct BOOLEAN NOT NULL DEFAULT FALSE,
result_code VARCHAR(50) NOT NULL DEFAULT '',
result_message TEXT NOT NULL DEFAULT '',
success BOOLEAN NOT NULL DEFAULT FALSE,
before_state JSONB,
after_state JSONB,
security_flags JSONB NOT NULL DEFAULT '{}'::jsonb,
risk_score INTEGER NOT NULL DEFAULT 0,
compliance_tags TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
invariant_rule VARCHAR(255) NOT NULL DEFAULT '',
extensions JSONB,
version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);
-- 创建月度分区函数
CREATE OR REPLACE FUNCTION create_audit_events_partition(partition_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := date_trunc('month', partition_date)::DATE;
end_date := (start_date + INTERVAL '1 month')::DATE;
partition_name := 'audit_events_' || to_char(start_date, 'YYYY_MM');
-- 检查分区是否已存在
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF audit_events FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
RAISE NOTICE 'Created partition: %', partition_name;
ELSE
RAISE NOTICE 'Partition already exists: %', partition_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 2. supply_usage_records 分区 (按月分区保留3个月) ====================
CREATE TABLE IF NOT EXISTS supply_usage_records (
id BIGSERIAL,
order_id BIGINT NOT NULL,
buyer_user_id BIGINT NOT NULL,
supply_account_id BIGINT NOT NULL,
supplier_user_id BIGINT NOT NULL,
request_id VARCHAR(64) NOT NULL,
upstream_request_id VARCHAR(128),
api_key_id BIGINT,
platform VARCHAR(50) NOT NULL,
model VARCHAR(100) NOT NULL,
endpoint VARCHAR(100) NOT NULL,
request_tokens BIGINT,
response_tokens BIGINT,
total_tokens BIGINT,
input_cost NUMERIC(20,6),
output_cost NUMERIC(20,6),
total_cost NUMERIC(20,6) NOT NULL,
unit_price NUMERIC(20,6) NOT NULL,
response_status INT,
latency_ms INT,
error_message TEXT,
success BOOLEAN DEFAULT TRUE,
started_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, started_at)
) PARTITION BY RANGE (started_at);
CREATE OR REPLACE FUNCTION create_usage_records_partition(partition_date DATE)
RETURNS VOID AS $$
DECLARE
partition_name TEXT;
start_date DATE;
end_date DATE;
BEGIN
start_date := date_trunc('month', partition_date)::DATE;
end_date := (start_date + INTERVAL '1 month')::DATE;
partition_name := 'supply_usage_records_' || to_char(start_date, 'YYYY_MM');
IF NOT EXISTS (
SELECT 1 FROM pg_class WHERE relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF supply_usage_records FOR VALUES FROM (%L) TO (%L)',
partition_name, start_date, end_date
);
RAISE NOTICE 'Created partition: %', partition_name;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 3. supply_idempotency_records 幂等表 (非分区保留7天) ====================
CREATE TABLE IF NOT EXISTS supply_idempotency_records (
id BIGSERIAL,
tenant_id BIGINT NOT NULL,
operator_id BIGINT NOT NULL,
api_path VARCHAR(200) NOT NULL,
idempotency_key VARCHAR(128) NOT NULL,
request_id VARCHAR(64) NOT NULL,
payload_hash CHAR(64) NOT NULL,
response_code INT,
response_body JSONB,
status VARCHAR(20) NOT NULL DEFAULT 'processing',
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
CONSTRAINT uq_supply_idempotency_records_key
UNIQUE (tenant_id, operator_id, api_path, idempotency_key)
);
-- 向后兼容保留函数名;幂等表不再分区。
CREATE OR REPLACE FUNCTION create_idempotency_partition(partition_date DATE)
RETURNS VOID AS $$
BEGIN
RAISE NOTICE 'supply_idempotency_records is no longer partitioned; skip partition setup for %', partition_date;
END;
$$ LANGUAGE plpgsql;
-- ==================== 4. 自动创建未来分区 ====================
CREATE OR REPLACE FUNCTION ensure_future_partitions()
RETURNS VOID AS $$
DECLARE
i INT;
future_date DATE;
BEGIN
-- 为未来3个月创建分区
FOR i IN 0..3 LOOP
future_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(future_date);
PERFORM create_usage_records_partition(future_date);
PERFORM create_idempotency_partition(future_date);
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- ==================== 5. 清理过期分区 ====================
CREATE OR REPLACE FUNCTION drop_old_audit_partitions(retention_months INT DEFAULT 12)
RETURNS INTEGER AS $$
DECLARE
partition_name TEXT;
cutoff_date DATE;
dropped_count INTEGER := 0;
BEGIN
cutoff_date := (CURRENT_DATE - (retention_months || ' months')::INTERVAL)::DATE;
FOR partition_name IN
SELECT relname
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relname ~ 'audit_events_20[0-9]{2}_[0-9]{2}'
AND n.nspname = 'public'
LOOP
-- 提取分区日期
IF partition_name < 'audit_events_' || to_char(cutoff_date, 'YYYY_MM') THEN
EXECUTE format('DROP TABLE IF EXISTS %I', partition_name);
dropped_count := dropped_count + 1;
RAISE NOTICE 'Dropped partition: %', partition_name;
END IF;
END LOOP;
RETURN dropped_count;
END;
$$ LANGUAGE plpgsql;
-- ==================== 6. 初始化分区 (首次运行) ====================
-- 创建初始分区过去12个月 + 未来3个月
DO $$
DECLARE
i INT;
target_date DATE;
BEGIN
-- 过去12个月
FOR i IN -12..0 LOOP
target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(target_date);
PERFORM create_usage_records_partition(target_date);
PERFORM create_idempotency_partition(target_date);
END LOOP;
-- 未来3个月
FOR i IN 1..3 LOOP
target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE;
PERFORM create_audit_events_partition(target_date);
PERFORM create_usage_records_partition(target_date);
PERFORM create_idempotency_partition(target_date);
END LOOP;
END $$;
-- ==================== 7. 索引 ====================
-- 在父表上创建索引(会自动继承到分区)
CREATE INDEX IF NOT EXISTS idx_audit_events_tenant_id ON audit_events(tenant_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_event_id ON audit_events(event_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_event_name ON audit_events(event_name);
CREATE INDEX IF NOT EXISTS idx_audit_events_request_id ON audit_events(request_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_trace_id ON audit_events(trace_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_idempotency_key ON audit_events(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_audit_events_created_at ON audit_events(created_at);
CREATE INDEX IF NOT EXISTS idx_audit_events_object ON audit_events(object_type, object_id);
CREATE INDEX IF NOT EXISTS idx_usage_records_order_id ON supply_usage_records(order_id);
CREATE INDEX IF NOT EXISTS idx_usage_records_started_at ON supply_usage_records(started_at);
CREATE INDEX IF NOT EXISTS idx_idempotency_request_id ON supply_idempotency_records(request_id);
CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at ON supply_idempotency_records(expires_at);
CREATE INDEX IF NOT EXISTS idx_idempotency_status_expires ON supply_idempotency_records(status, expires_at);
-- ==================== 8. 注释 ====================
COMMENT ON TABLE audit_events IS '审计事件表 - 按月分区保留12个月';
COMMENT ON TABLE supply_usage_records IS '使用记录表 - 按月分区保留3个月';
COMMENT ON TABLE supply_idempotency_records IS '幂等记录表 - 非分区唯一表保留7天以上';
-- 创建pg_cron作业定期维护分区需要扩展 pg_cron
-- SELECT cron.schedule('partition-maintenance', '0 0 * * *', 'SELECT ensure_future_partitions()');
-- SELECT cron.schedule('partition-cleanup', '0 1 * * *', 'SELECT drop_old_audit_partitions(12)');