docs(review): add remediation plans and readiness artifacts
Add design, review, and production-readiness documents for the April remediation cycle.\nInclude supporting SQL and supply-api operational design notes so review conclusions and implementation guidance stay versioned together.
This commit is contained in:
332
sql/postgresql/outbox_pattern_v1.sql
Normal file
332
sql/postgresql/outbox_pattern_v1.sql
Normal file
@@ -0,0 +1,332 @@
|
||||
-- P0-006 Outbox模式实现
|
||||
-- 基于: docs/P0_issues_enhanced_design_v1_2026-04-07.md
|
||||
|
||||
-- ==================== Outbox事件表 ====================
|
||||
CREATE TABLE IF NOT EXISTS outbox_events (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
aggregate_type VARCHAR(64) NOT NULL, -- 聚合类型: supply_account, package, settlement
|
||||
aggregate_id VARCHAR(128) NOT NULL, -- 聚合ID
|
||||
event_type VARCHAR(128) NOT NULL, -- 事件类型: created, updated, revoked
|
||||
event_id VARCHAR(64) NOT NULL UNIQUE, -- 事件全局唯一ID (UUID)
|
||||
payload JSONB NOT NULL, -- 事件载荷
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'dead_letter')),
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
max_retries INT NOT NULL DEFAULT 5,
|
||||
error_message TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
processed_at TIMESTAMPTZ,
|
||||
next_retry_at TIMESTAMPTZ,
|
||||
dead_letter_reason TEXT,
|
||||
version BIGINT NOT NULL DEFAULT 1
|
||||
);
|
||||
|
||||
-- 索引
|
||||
CREATE INDEX idx_outbox_events_status_next_retry
|
||||
ON outbox_events (status, next_retry_at)
|
||||
WHERE status IN ('pending', 'failed');
|
||||
CREATE INDEX idx_outbox_events_aggregate
|
||||
ON outbox_events (aggregate_type, aggregate_id);
|
||||
CREATE INDEX idx_outbox_events_created_at
|
||||
ON outbox_events (created_at);
|
||||
CREATE INDEX idx_outbox_events_event_id
|
||||
ON outbox_events (event_id);
|
||||
|
||||
COMMENT ON TABLE outbox_events IS 'Outbox事件表,用于可靠消息投递';
|
||||
COMMENT ON COLUMN outbox_events.aggregate_type IS '聚合类型,如 supply_account, package, settlement';
|
||||
COMMENT ON COLUMN outbox_events.event_type IS '事件类型,如 created, updated, revoked';
|
||||
COMMENT ON COLUMN outbox_events.status IS '状态: pending, processing, completed, failed, dead_letter';
|
||||
COMMENT ON COLUMN outbox_events.max_retries IS '最大重试次数,默认5次';
|
||||
|
||||
-- ==================== 死信队列表 ====================
|
||||
CREATE TABLE IF NOT EXISTS outbox_dead_letter (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
original_event_id VARCHAR(64) NOT NULL,
|
||||
original_aggregate_type VARCHAR(64) NOT NULL,
|
||||
original_aggregate_id VARCHAR(128) NOT NULL,
|
||||
event_type VARCHAR(128) NOT NULL,
|
||||
payload JSONB NOT NULL,
|
||||
error_message TEXT,
|
||||
retry_count INT NOT NULL,
|
||||
first_failed_at TIMESTAMPTZ NOT NULL,
|
||||
dead_letter_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
handled BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
handled_at TIMESTAMPTZ,
|
||||
handler_notes TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- 索引
|
||||
CREATE INDEX idx_outbox_dead_letter_unhandled
|
||||
ON outbox_dead_letter (handled, dead_letter_at)
|
||||
WHERE handled = FALSE;
|
||||
CREATE INDEX idx_outbox_dead_letter_original_event
|
||||
ON outbox_dead_letter (original_event_id);
|
||||
|
||||
COMMENT ON TABLE outbox_dead_letter IS 'Outbox死信队列,存储超过最大重试次数的事件';
|
||||
COMMENT ON COLUMN outbox_dead_letter.handled IS '是否已处理';
|
||||
COMMENT ON COLUMN outbox_dead_letter.handler_notes IS '处理备注';
|
||||
|
||||
-- ==================== 补偿记录表 (P0-007) ====================
|
||||
CREATE TABLE IF NOT EXISTS supply_batch_compensation (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
batch_id VARCHAR(64) NOT NULL,
|
||||
operation_type VARCHAR(32) NOT NULL,
|
||||
item_index INT NOT NULL,
|
||||
item_payload JSONB NOT NULL,
|
||||
failure_reason TEXT,
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||
CHECK (status IN ('pending', 'retrying', 'resolved', 'manual_required', 'abandoned')),
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
max_retries INT NOT NULL DEFAULT 3,
|
||||
resolved_at TIMESTAMPTZ,
|
||||
resolved_by BIGINT,
|
||||
resolution_notes TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
created_by BIGINT,
|
||||
version BIGINT NOT NULL DEFAULT 1
|
||||
);
|
||||
|
||||
-- 索引
|
||||
CREATE INDEX idx_compensation_batch
|
||||
ON supply_batch_compensation (batch_id, status);
|
||||
CREATE INDEX idx_compensation_status
|
||||
ON supply_batch_compensation (status, created_at);
|
||||
|
||||
COMMENT ON TABLE supply_batch_compensation IS '批量操作补偿记录';
|
||||
COMMENT ON COLUMN supply_batch_compensation.batch_id IS '批量任务ID';
|
||||
COMMENT ON COLUMN supply_batch_compensation.status IS '状态: pending, retrying, resolved, manual_required, abandoned';
|
||||
|
||||
-- ==================== Outbox处理器 ====================
|
||||
|
||||
-- 获取待处理事件(带悲观锁)
|
||||
CREATE OR REPLACE FUNCTION fetch_and_lock_outbox_events(
|
||||
p_limit INT DEFAULT 100
|
||||
) RETURNS SETOF outbox_events AS $$
|
||||
DECLARE
|
||||
r outbox_events%ROWTYPE;
|
||||
BEGIN
|
||||
FOR r IN
|
||||
SELECT *
|
||||
FROM outbox_events
|
||||
WHERE status IN ('pending', 'failed')
|
||||
AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT p_limit
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LOOP
|
||||
-- 更新状态为processing
|
||||
UPDATE outbox_events
|
||||
SET status = 'processing',
|
||||
version = version + 1
|
||||
WHERE id = r.id
|
||||
AND version = r.version;
|
||||
|
||||
IF FOUND THEN
|
||||
RETURN NEXT r;
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 标记事件完成
|
||||
CREATE OR REPLACE FUNCTION mark_outbox_completed(
|
||||
p_event_id VARCHAR(64)
|
||||
) RETURNS VOID AS $$
|
||||
BEGIN
|
||||
UPDATE outbox_events
|
||||
SET status = 'completed',
|
||||
processed_at = CURRENT_TIMESTAMP
|
||||
WHERE event_id = p_event_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 标记事件失败并计算下次重试时间
|
||||
CREATE OR REPLACE FUNCTION mark_outbox_failed(
|
||||
p_event_id VARCHAR(64),
|
||||
p_error_message TEXT
|
||||
) RETURNS VOID AS $$
|
||||
DECLARE
|
||||
v_event outbox_events%ROWTYPE;
|
||||
v_backoff_seconds INT;
|
||||
BEGIN
|
||||
-- 获取事件信息
|
||||
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
-- 计算重试次数
|
||||
UPDATE outbox_events
|
||||
SET retry_count = retry_count + 1,
|
||||
error_message = p_error_message,
|
||||
version = version + 1
|
||||
WHERE event_id = p_event_id;
|
||||
|
||||
-- 重新获取更新后的事件
|
||||
SELECT * INTO v_event FROM outbox_events WHERE event_id = p_event_id;
|
||||
|
||||
-- 检查是否超过最大重试次数
|
||||
IF v_event.retry_count >= v_event.max_retries THEN
|
||||
-- 移入死信队列
|
||||
INSERT INTO outbox_dead_letter (
|
||||
original_event_id,
|
||||
original_aggregate_type,
|
||||
original_aggregate_id,
|
||||
event_type,
|
||||
payload,
|
||||
error_message,
|
||||
retry_count,
|
||||
first_failed_at
|
||||
) VALUES (
|
||||
v_event.event_id,
|
||||
v_event.aggregate_type,
|
||||
v_event.aggregate_id,
|
||||
v_event.event_type,
|
||||
v_event.payload,
|
||||
p_error_message,
|
||||
v_event.retry_count,
|
||||
v_event.created_at
|
||||
);
|
||||
|
||||
-- 更新状态为dead_letter
|
||||
UPDATE outbox_events
|
||||
SET status = 'dead_letter',
|
||||
dead_letter_reason = p_error_message
|
||||
WHERE event_id = p_event_id;
|
||||
ELSE
|
||||
-- 计算指数退避时间
|
||||
v_backoff_seconds := LEAST(60, POWER(2, v_event.retry_count))::INT;
|
||||
|
||||
UPDATE outbox_events
|
||||
SET status = 'failed',
|
||||
next_retry_at = CURRENT_TIMESTAMP + (v_backoff_seconds || ' seconds')::INTERVAL
|
||||
WHERE event_id = p_event_id;
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- ==================== 补偿记录操作 ====================
|
||||
|
||||
-- 创建补偿记录
|
||||
CREATE OR REPLACE FUNCTION create_compensation(
|
||||
p_batch_id VARCHAR(64),
|
||||
p_operation_type VARCHAR(32),
|
||||
p_item_index INT,
|
||||
p_item_payload JSONB,
|
||||
p_failure_reason TEXT
|
||||
) RETURNS BIGINT AS $$
|
||||
DECLARE
|
||||
v_id BIGINT;
|
||||
BEGIN
|
||||
INSERT INTO supply_batch_compensation (
|
||||
batch_id,
|
||||
operation_type,
|
||||
item_index,
|
||||
item_payload,
|
||||
failure_reason,
|
||||
status
|
||||
) VALUES (
|
||||
p_batch_id,
|
||||
p_operation_type,
|
||||
p_item_index,
|
||||
p_item_payload,
|
||||
p_failure_reason,
|
||||
'pending'
|
||||
)
|
||||
RETURNING id INTO v_id;
|
||||
|
||||
RETURN v_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 重试补偿
|
||||
CREATE OR REPLACE FUNCTION retry_compensation(
|
||||
p_id BIGINT
|
||||
) RETURNS VOID AS $$
|
||||
BEGIN
|
||||
UPDATE supply_batch_compensation
|
||||
SET status = 'retrying',
|
||||
retry_count = retry_count + 1,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = p_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 解决补偿
|
||||
CREATE OR REPLACE FUNCTION resolve_compensation(
|
||||
p_id BIGINT,
|
||||
p_resolved_by BIGINT,
|
||||
p_notes TEXT
|
||||
) RETURNS VOID AS $$
|
||||
BEGIN
|
||||
UPDATE supply_batch_compensation
|
||||
SET status = 'resolved',
|
||||
resolved_at = CURRENT_TIMESTAMP,
|
||||
resolved_by = p_resolved_by,
|
||||
resolution_notes = p_notes,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = p_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 标记需要人工介入
|
||||
CREATE OR REPLACE FUNCTION mark_compensation_manual_required(
|
||||
p_id BIGINT,
|
||||
p_reason TEXT
|
||||
) RETURNS VOID AS $$
|
||||
BEGIN
|
||||
UPDATE supply_batch_compensation
|
||||
SET status = 'manual_required',
|
||||
failure_reason = COALESCE(failure_reason || '; ', '') || p_reason,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = p_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- ==================== 统计查询 ====================
|
||||
|
||||
-- 获取Outbox处理统计
|
||||
CREATE OR REPLACE FUNCTION get_outbox_stats() RETURNS TABLE(
|
||||
status VARCHAR(20),
|
||||
count BIGINT
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT
|
||||
status,
|
||||
COUNT(*)
|
||||
FROM outbox_events
|
||||
GROUP BY status;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 获取死信队列统计
|
||||
CREATE OR REPLACE FUNCTION get_dead_letter_stats() RETURNS TABLE(
|
||||
handled BOOLEAN,
|
||||
count BIGINT
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT
|
||||
handled,
|
||||
COUNT(*)
|
||||
FROM outbox_dead_letter
|
||||
GROUP BY handled;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- 获取需要人工介入的补偿记录
|
||||
CREATE OR REPLACE FUNCTION get_pending_manual_compensations()
|
||||
RETURNS SETOF supply_batch_compensation AS $$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT *
|
||||
FROM supply_batch_compensation
|
||||
WHERE status = 'manual_required'
|
||||
ORDER BY created_at ASC;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
Reference in New Issue
Block a user