Files
lijiaoqiao/sql/postgresql/outbox_pattern_v1.sql
Your Name aecba5ff27 docs(review): add remediation plans and readiness artifacts
Add design, review, and production-readiness documents for the April remediation cycle.\nInclude supporting SQL and supply-api operational design notes so review conclusions and implementation guidance stay versioned together.
2026-04-13 18:54:45 +08:00

333 lines
9.9 KiB
PL/PgSQL
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
-- P0-006 Outbox模式实现
-- 基于: docs/P0_issues_enhanced_design_v1_2026-04-07.md
-- ==================== Outbox事件表 ====================
CREATE TABLE IF NOT EXISTS outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL, -- 聚合类型: supply_account, package, settlement
aggregate_id VARCHAR(128) NOT NULL, -- 聚合ID
event_type VARCHAR(128) NOT NULL, -- 事件类型: created, updated, revoked
event_id VARCHAR(64) NOT NULL UNIQUE, -- 事件全局唯一ID (UUID)
payload JSONB NOT NULL, -- 事件载荷
status VARCHAR(20) NOT NULL DEFAULT 'pending',
CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'dead_letter')),
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 5,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
dead_letter_reason TEXT,
version BIGINT NOT NULL DEFAULT 1
);
-- 索引
CREATE INDEX idx_outbox_events_status_next_retry
ON outbox_events (status, next_retry_at)
WHERE status IN ('pending', 'failed');
CREATE INDEX idx_outbox_events_aggregate
ON outbox_events (aggregate_type, aggregate_id);
CREATE INDEX idx_outbox_events_created_at
ON outbox_events (created_at);
CREATE INDEX idx_outbox_events_event_id
ON outbox_events (event_id);
COMMENT ON TABLE outbox_events IS 'Outbox事件表用于可靠消息投递';
COMMENT ON COLUMN outbox_events.aggregate_type IS '聚合类型,如 supply_account, package, settlement';
COMMENT ON COLUMN outbox_events.event_type IS '事件类型,如 created, updated, revoked';
COMMENT ON COLUMN outbox_events.status IS '状态: pending, processing, completed, failed, dead_letter';
COMMENT ON COLUMN outbox_events.max_retries IS '最大重试次数默认5次';
-- ==================== 死信队列表 ====================
CREATE TABLE IF NOT EXISTS outbox_dead_letter (
id BIGSERIAL PRIMARY KEY,
original_event_id VARCHAR(64) NOT NULL,
original_aggregate_type VARCHAR(64) NOT NULL,
original_aggregate_id VARCHAR(128) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
error_message TEXT,
retry_count INT NOT NULL,
first_failed_at TIMESTAMPTZ NOT NULL,
dead_letter_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
handled BOOLEAN NOT NULL DEFAULT FALSE,
handled_at TIMESTAMPTZ,
handler_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 索引
CREATE INDEX idx_outbox_dead_letter_unhandled
ON outbox_dead_letter (handled, dead_letter_at)
WHERE handled = FALSE;
CREATE INDEX idx_outbox_dead_letter_original_event
ON outbox_dead_letter (original_event_id);
COMMENT ON TABLE outbox_dead_letter IS 'Outbox死信队列存储超过最大重试次数的事件';
COMMENT ON COLUMN outbox_dead_letter.handled IS '是否已处理';
COMMENT ON COLUMN outbox_dead_letter.handler_notes IS '处理备注';
-- ==================== 补偿记录表 (P0-007) ====================
CREATE TABLE IF NOT EXISTS supply_batch_compensation (
id BIGSERIAL PRIMARY KEY,
batch_id VARCHAR(64) NOT NULL,
operation_type VARCHAR(32) NOT NULL,
item_index INT NOT NULL,
item_payload JSONB NOT NULL,
failure_reason TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
CHECK (status IN ('pending', 'retrying', 'resolved', 'manual_required', 'abandoned')),
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
resolved_at TIMESTAMPTZ,
resolved_by BIGINT,
resolution_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by BIGINT,
version BIGINT NOT NULL DEFAULT 1
);
-- 索引
CREATE INDEX idx_compensation_batch
ON supply_batch_compensation (batch_id, status);
CREATE INDEX idx_compensation_status
ON supply_batch_compensation (status, created_at);
COMMENT ON TABLE supply_batch_compensation IS '批量操作补偿记录';
COMMENT ON COLUMN supply_batch_compensation.batch_id IS '批量任务ID';
COMMENT ON COLUMN supply_batch_compensation.status IS '状态: pending, retrying, resolved, manual_required, abandoned';
-- ==================== Outbox处理器 ====================
-- 获取待处理事件(带悲观锁)
CREATE OR REPLACE FUNCTION fetch_and_lock_outbox_events(
p_limit INT DEFAULT 100
) RETURNS SETOF outbox_events AS $$
DECLARE
r outbox_events%ROWTYPE;
BEGIN
FOR r IN
SELECT *
FROM outbox_events
WHERE status IN ('pending', 'failed')
AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
ORDER BY created_at ASC
LIMIT p_limit
FOR UPDATE SKIP LOCKED
LOOP
-- 更新状态为processing
UPDATE outbox_events
SET status = 'processing',
version = version + 1
WHERE id = r.id
AND version = r.version;
IF FOUND THEN
RETURN NEXT r;
END IF;
END LOOP;
RETURN;
END;
$$ LANGUAGE plpgsql;
-- 标记事件完成
CREATE OR REPLACE FUNCTION mark_outbox_completed(
p_event_id VARCHAR(64)
) RETURNS VOID AS $$
BEGIN
UPDATE outbox_events
SET status = 'completed',
processed_at = CURRENT_TIMESTAMP
WHERE event_id = p_event_id;
END;
$$ LANGUAGE plpgsql;
-- 标记事件失败并计算下次重试时间
CREATE OR REPLACE FUNCTION mark_outbox_failed(
p_event_id VARCHAR(64),
p_error_message TEXT
) RETURNS VOID AS $$
DECLARE
v_event outbox_events%ROWTYPE;
v_backoff_seconds INT;
BEGIN
-- 获取事件信息
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
IF NOT FOUND THEN
RETURN;
END IF;
-- 计算重试次数
UPDATE outbox_events
SET retry_count = retry_count + 1,
error_message = p_error_message,
version = version + 1
WHERE event_id = p_event_id;
-- 重新获取更新后的事件
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
-- 检查是否超过最大重试次数
IF v_event.retry_count >= v_event.max_retries THEN
-- 移入死信队列
INSERT INTO outbox_dead_letter (
original_event_id,
original_aggregate_type,
original_aggregate_id,
event_type,
payload,
error_message,
retry_count,
first_failed_at
) VALUES (
v_event.event_id,
v_event.aggregate_type,
v_event.aggregate_id,
v_event.event_type,
v_event.payload,
p_error_message,
v_event.retry_count,
v_event.created_at
);
-- 更新状态为dead_letter
UPDATE outbox_events
SET status = 'dead_letter',
dead_letter_reason = p_error_message
WHERE event_id = p_event_id;
ELSE
-- 计算指数退避时间
v_backoff_seconds := LEAST(60, POWER(2, v_event.retry_count))::INT;
UPDATE outbox_events
SET status = 'failed',
next_retry_at = CURRENT_TIMESTAMP + (v_backoff_seconds || ' seconds')::INTERVAL
WHERE event_id = p_event_id;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 补偿记录操作 ====================
-- 创建补偿记录
CREATE OR REPLACE FUNCTION create_compensation(
p_batch_id VARCHAR(64),
p_operation_type VARCHAR(32),
p_item_index INT,
p_item_payload JSONB,
p_failure_reason TEXT
) RETURNS BIGINT AS $$
DECLARE
v_id BIGINT;
BEGIN
INSERT INTO supply_batch_compensation (
batch_id,
operation_type,
item_index,
item_payload,
failure_reason,
status
) VALUES (
p_batch_id,
p_operation_type,
p_item_index,
p_item_payload,
p_failure_reason,
'pending'
)
RETURNING id INTO v_id;
RETURN v_id;
END;
$$ LANGUAGE plpgsql;
-- 重试补偿
CREATE OR REPLACE FUNCTION retry_compensation(
p_id BIGINT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'retrying',
retry_count = retry_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- 解决补偿
CREATE OR REPLACE FUNCTION resolve_compensation(
p_id BIGINT,
p_resolved_by BIGINT,
p_notes TEXT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'resolved',
resolved_at = CURRENT_TIMESTAMP,
resolved_by = p_resolved_by,
resolution_notes = p_notes,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- 标记需要人工介入
CREATE OR REPLACE FUNCTION mark_compensation_manual_required(
p_id BIGINT,
p_reason TEXT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'manual_required',
failure_reason = COALESCE(failure_reason || '; ', '') || p_reason,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- ==================== 统计查询 ====================
-- 获取Outbox处理统计
CREATE OR REPLACE FUNCTION get_outbox_stats() RETURNS TABLE(
status VARCHAR(20),
count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
status,
COUNT(*)
FROM outbox_events
GROUP BY status;
END;
$$ LANGUAGE plpgsql;
-- 获取死信队列统计
CREATE OR REPLACE FUNCTION get_dead_letter_stats() RETURNS TABLE(
handled BOOLEAN,
count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
handled,
COUNT(*)
FROM outbox_dead_letter
GROUP BY handled;
END;
$$ LANGUAGE plpgsql;
-- 获取需要人工介入的补偿记录
CREATE OR REPLACE FUNCTION get_pending_manual_compensations()
RETURNS SETOF supply_batch_compensation AS $$
BEGIN
RETURN QUERY
SELECT *
FROM supply_batch_compensation
WHERE status = 'manual_required'
ORDER BY created_at ASC;
END;
$$ LANGUAGE plpgsql;