Files
lijiaoqiao/sql/postgresql/outbox_pattern_v1.sql

333 lines
9.9 KiB
MySQL
Raw Normal View History

-- P0-006 Outbox模式实现
-- 基于: docs/P0_issues_enhanced_design_v1_2026-04-07.md
-- ==================== Outbox事件表 ====================
CREATE TABLE IF NOT EXISTS outbox_events (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL, -- 聚合类型: supply_account, package, settlement
aggregate_id VARCHAR(128) NOT NULL, -- 聚合ID
event_type VARCHAR(128) NOT NULL, -- 事件类型: created, updated, revoked
event_id VARCHAR(64) NOT NULL UNIQUE, -- 事件全局唯一ID (UUID)
payload JSONB NOT NULL, -- 事件载荷
status VARCHAR(20) NOT NULL DEFAULT 'pending',
CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'dead_letter')),
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 5,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
dead_letter_reason TEXT,
version BIGINT NOT NULL DEFAULT 1
);
-- 索引
CREATE INDEX idx_outbox_events_status_next_retry
ON outbox_events (status, next_retry_at)
WHERE status IN ('pending', 'failed');
CREATE INDEX idx_outbox_events_aggregate
ON outbox_events (aggregate_type, aggregate_id);
CREATE INDEX idx_outbox_events_created_at
ON outbox_events (created_at);
CREATE INDEX idx_outbox_events_event_id
ON outbox_events (event_id);
COMMENT ON TABLE outbox_events IS 'Outbox事件表用于可靠消息投递';
COMMENT ON COLUMN outbox_events.aggregate_type IS '聚合类型,如 supply_account, package, settlement';
COMMENT ON COLUMN outbox_events.event_type IS '事件类型,如 created, updated, revoked';
COMMENT ON COLUMN outbox_events.status IS '状态: pending, processing, completed, failed, dead_letter';
COMMENT ON COLUMN outbox_events.max_retries IS '最大重试次数默认5次';
-- ==================== 死信队列表 ====================
CREATE TABLE IF NOT EXISTS outbox_dead_letter (
id BIGSERIAL PRIMARY KEY,
original_event_id VARCHAR(64) NOT NULL,
original_aggregate_type VARCHAR(64) NOT NULL,
original_aggregate_id VARCHAR(128) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
error_message TEXT,
retry_count INT NOT NULL,
first_failed_at TIMESTAMPTZ NOT NULL,
dead_letter_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
handled BOOLEAN NOT NULL DEFAULT FALSE,
handled_at TIMESTAMPTZ,
handler_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 索引
CREATE INDEX idx_outbox_dead_letter_unhandled
ON outbox_dead_letter (handled, dead_letter_at)
WHERE handled = FALSE;
CREATE INDEX idx_outbox_dead_letter_original_event
ON outbox_dead_letter (original_event_id);
COMMENT ON TABLE outbox_dead_letter IS 'Outbox死信队列存储超过最大重试次数的事件';
COMMENT ON COLUMN outbox_dead_letter.handled IS '是否已处理';
COMMENT ON COLUMN outbox_dead_letter.handler_notes IS '处理备注';
-- ==================== 补偿记录表 (P0-007) ====================
CREATE TABLE IF NOT EXISTS supply_batch_compensation (
id BIGSERIAL PRIMARY KEY,
batch_id VARCHAR(64) NOT NULL,
operation_type VARCHAR(32) NOT NULL,
item_index INT NOT NULL,
item_payload JSONB NOT NULL,
failure_reason TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
CHECK (status IN ('pending', 'retrying', 'resolved', 'manual_required', 'abandoned')),
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
resolved_at TIMESTAMPTZ,
resolved_by BIGINT,
resolution_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_by BIGINT,
version BIGINT NOT NULL DEFAULT 1
);
-- 索引
CREATE INDEX idx_compensation_batch
ON supply_batch_compensation (batch_id, status);
CREATE INDEX idx_compensation_status
ON supply_batch_compensation (status, created_at);
COMMENT ON TABLE supply_batch_compensation IS '批量操作补偿记录';
COMMENT ON COLUMN supply_batch_compensation.batch_id IS '批量任务ID';
COMMENT ON COLUMN supply_batch_compensation.status IS '状态: pending, retrying, resolved, manual_required, abandoned';
-- ==================== Outbox处理器 ====================
-- 获取待处理事件(带悲观锁)
CREATE OR REPLACE FUNCTION fetch_and_lock_outbox_events(
p_limit INT DEFAULT 100
) RETURNS SETOF outbox_events AS $$
DECLARE
r outbox_events%ROWTYPE;
BEGIN
FOR r IN
SELECT *
FROM outbox_events
WHERE status IN ('pending', 'failed')
AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
ORDER BY created_at ASC
LIMIT p_limit
FOR UPDATE SKIP LOCKED
LOOP
-- 更新状态为processing
UPDATE outbox_events
SET status = 'processing',
version = version + 1
WHERE id = r.id
AND version = r.version;
IF FOUND THEN
RETURN NEXT r;
END IF;
END LOOP;
RETURN;
END;
$$ LANGUAGE plpgsql;
-- 标记事件完成
CREATE OR REPLACE FUNCTION mark_outbox_completed(
p_event_id VARCHAR(64)
) RETURNS VOID AS $$
BEGIN
UPDATE outbox_events
SET status = 'completed',
processed_at = CURRENT_TIMESTAMP
WHERE event_id = p_event_id;
END;
$$ LANGUAGE plpgsql;
-- 标记事件失败并计算下次重试时间
CREATE OR REPLACE FUNCTION mark_outbox_failed(
p_event_id VARCHAR(64),
p_error_message TEXT
) RETURNS VOID AS $$
DECLARE
v_event outbox_events%ROWTYPE;
v_backoff_seconds INT;
BEGIN
-- 获取事件信息
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
IF NOT FOUND THEN
RETURN;
END IF;
-- 计算重试次数
UPDATE outbox_events
SET retry_count = retry_count + 1,
error_message = p_error_message,
version = version + 1
WHERE event_id = p_event_id;
-- 重新获取更新后的事件
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
-- 检查是否超过最大重试次数
IF v_event.retry_count >= v_event.max_retries THEN
-- 移入死信队列
INSERT INTO outbox_dead_letter (
original_event_id,
original_aggregate_type,
original_aggregate_id,
event_type,
payload,
error_message,
retry_count,
first_failed_at
) VALUES (
v_event.event_id,
v_event.aggregate_type,
v_event.aggregate_id,
v_event.event_type,
v_event.payload,
p_error_message,
v_event.retry_count,
v_event.created_at
);
-- 更新状态为dead_letter
UPDATE outbox_events
SET status = 'dead_letter',
dead_letter_reason = p_error_message
WHERE event_id = p_event_id;
ELSE
-- 计算指数退避时间
v_backoff_seconds := LEAST(60, POWER(2, v_event.retry_count))::INT;
UPDATE outbox_events
SET status = 'failed',
next_retry_at = CURRENT_TIMESTAMP + (v_backoff_seconds || ' seconds')::INTERVAL
WHERE event_id = p_event_id;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ==================== 补偿记录操作 ====================
-- 创建补偿记录
CREATE OR REPLACE FUNCTION create_compensation(
p_batch_id VARCHAR(64),
p_operation_type VARCHAR(32),
p_item_index INT,
p_item_payload JSONB,
p_failure_reason TEXT
) RETURNS BIGINT AS $$
DECLARE
v_id BIGINT;
BEGIN
INSERT INTO supply_batch_compensation (
batch_id,
operation_type,
item_index,
item_payload,
failure_reason,
status
) VALUES (
p_batch_id,
p_operation_type,
p_item_index,
p_item_payload,
p_failure_reason,
'pending'
)
RETURNING id INTO v_id;
RETURN v_id;
END;
$$ LANGUAGE plpgsql;
-- 重试补偿
CREATE OR REPLACE FUNCTION retry_compensation(
p_id BIGINT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'retrying',
retry_count = retry_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- 解决补偿
CREATE OR REPLACE FUNCTION resolve_compensation(
p_id BIGINT,
p_resolved_by BIGINT,
p_notes TEXT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'resolved',
resolved_at = CURRENT_TIMESTAMP,
resolved_by = p_resolved_by,
resolution_notes = p_notes,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- 标记需要人工介入
CREATE OR REPLACE FUNCTION mark_compensation_manual_required(
p_id BIGINT,
p_reason TEXT
) RETURNS VOID AS $$
BEGIN
UPDATE supply_batch_compensation
SET status = 'manual_required',
failure_reason = COALESCE(failure_reason || '; ', '') || p_reason,
updated_at = CURRENT_TIMESTAMP
WHERE id = p_id;
END;
$$ LANGUAGE plpgsql;
-- ==================== 统计查询 ====================
-- 获取Outbox处理统计
CREATE OR REPLACE FUNCTION get_outbox_stats() RETURNS TABLE(
status VARCHAR(20),
count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
status,
COUNT(*)
FROM outbox_events
GROUP BY status;
END;
$$ LANGUAGE plpgsql;
-- 获取死信队列统计
CREATE OR REPLACE FUNCTION get_dead_letter_stats() RETURNS TABLE(
handled BOOLEAN,
count BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
handled,
COUNT(*)
FROM outbox_dead_letter
GROUP BY handled;
END;
$$ LANGUAGE plpgsql;
-- 获取需要人工介入的补偿记录
CREATE OR REPLACE FUNCTION get_pending_manual_compensations()
RETURNS SETOF supply_batch_compensation AS $$
BEGIN
RETURN QUERY
SELECT *
FROM supply_batch_compensation
WHERE status = 'manual_required'
ORDER BY created_at ASC;
END;
$$ LANGUAGE plpgsql;