diff --git a/db/migration/0002_platform_event_outbox.up.sql b/db/migration/0002_platform_event_outbox.up.sql new file mode 100644 index 0000000..2117e61 --- /dev/null +++ b/db/migration/0002_platform_event_outbox.up.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS cs_platform_callbacks ( + platform VARCHAR(32) NOT NULL, + target_name VARCHAR(64) NOT NULL, + callback_url TEXT NOT NULL, + callback_secret TEXT NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (platform, target_name) +); + +CREATE TABLE IF NOT EXISTS cs_platform_event_outbox ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + platform VARCHAR(32) NOT NULL, + event_type VARCHAR(64) NOT NULL, + session_id UUID NULL REFERENCES cs_sessions(id) ON DELETE SET NULL, + ticket_id UUID NULL REFERENCES cs_tickets(id) ON DELETE SET NULL, + source_message_id VARCHAR(128) NULL, + callback_target VARCHAR(64) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + status VARCHAR(16) NOT NULL DEFAULT 'pending', + attempt_count INT NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + delivered_at TIMESTAMPTZ NULL, + last_error TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT chk_cs_platform_event_outbox_status CHECK (status IN ('pending','retrying','delivered','dead_letter')) +); +CREATE INDEX IF NOT EXISTS idx_cs_platform_event_outbox_due ON cs_platform_event_outbox(status, next_attempt_at, created_at); +CREATE INDEX IF NOT EXISTS idx_cs_platform_event_outbox_platform ON cs_platform_event_outbox(platform, callback_target, created_at DESC); + +CREATE TABLE IF NOT EXISTS cs_platform_event_delivery_attempts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + event_id UUID NOT NULL REFERENCES cs_platform_event_outbox(id) ON DELETE CASCADE, + attempt_no INT NOT NULL, + response_status INT NULL, + response_body TEXT NULL, + error_message TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_cs_platform_event_delivery_attempts_event ON cs_platform_event_delivery_attempts(event_id, created_at DESC); + +CREATE TABLE IF NOT EXISTS cs_platform_event_dead_letters ( + event_id UUID PRIMARY KEY REFERENCES cs_platform_event_outbox(id) ON DELETE CASCADE, + platform VARCHAR(32) NOT NULL, + event_type VARCHAR(64) NOT NULL, + callback_target VARCHAR(64) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + attempt_count INT NOT NULL DEFAULT 0, + final_error TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/db/migration/0003_platform_channels.up.sql b/db/migration/0003_platform_channels.up.sql new file mode 100644 index 0000000..7b5075e --- /dev/null +++ b/db/migration/0003_platform_channels.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE cs_sessions DROP CONSTRAINT IF EXISTS chk_cs_sessions_channel; + +ALTER TABLE cs_sessions +ADD CONSTRAINT chk_cs_sessions_channel +CHECK (channel IN ('telegram','discord','wechat','widget','sub2api','newapi')); diff --git a/docs/CONFIG_CONTRACT_BASELINE.md b/docs/CONFIG_CONTRACT_BASELINE.md index 4f24d73..1d4ed92 100644 --- a/docs/CONFIG_CONTRACT_BASELINE.md +++ b/docs/CONFIG_CONTRACT_BASELINE.md @@ -51,6 +51,24 @@ | `AI_CS_WEBHOOK_SIGNATURE_HEADER` | `X-CS-Signature` | 签名请求头 | 无额外校验 | 可 | | `AI_CS_WEBHOOK_MAX_SKEW_SECONDS` | `300` | 最大时钟偏差(秒) | 必须 > 0 | 需安全确认 | +### 1.4 Platform Adapters + +| 变量名 | 默认值 | 含义 | 当前代码是否校验 | prod 是否应允许默认值 | +|---|---|---|---|---| +| `AI_CS_PLATFORM_ADAPTERS_ENABLED` | `false` | 是否启用平台适配入口 | 解析布尔值 | 视接入计划决定 | +| `AI_CS_PLATFORM_SUB2API_ENABLED` | `false` | 是否启用 `sub2api` 入站适配 | 解析布尔值 | 视接入计划决定 | +| `AI_CS_PLATFORM_SUB2API_INGRESS_SECRET` | 空 | `sub2api` 平台 webhook HMAC secret | 启用 `sub2api` 时必填 | **不允许为空** | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL` | 空 | `sub2api` 回调基地址 | 当前仅解析,不强校验 | 视后续出站回调批次决定 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET` | 空 | `sub2api` 回调签名 secret | 当前仅解析,不强校验 | 视后续出站回调批次决定 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS` | `3000` | `sub2api` 回调超时(毫秒) | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES` | `5` | `sub2api` 回调最大重试次数 | 必须 >= 0(启用时) | 可 | +| `AI_CS_PLATFORM_NEWAPI_ENABLED` | `false` | 是否启用 `newapi` 入站适配 | 解析布尔值 | 视接入计划决定 | +| `AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET` | 空 | `newapi` 平台 webhook HMAC secret | 启用 `newapi` 时必填 | **不允许为空** | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL` | 空 | `newapi` 回调基地址 | 当前仅解析,不强校验 | 视后续出站回调批次决定 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET` | 空 | `newapi` 回调签名 secret | 当前仅解析,不强校验 | 视后续出站回调批次决定 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS` | `3000` | `newapi` 回调超时(毫秒) | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES` | `5` | `newapi` 回调最大重试次数 | 必须 >= 0(启用时) | 可 | + --- ## 2. 当前代码已经执行的校验 @@ -64,6 +82,9 @@ 5. `AI_CS_RUNTIME_ENV` 只允许 `production/development/test` 6. `AI_CS_RUNTIME_ENV=production` 时,`AI_CS_POSTGRES_ENABLED` 必须为 `true` 7. `AI_CS_RUNTIME_ENV=production` 时,`AI_CS_WEBHOOK_SECRET` 不允许为空 +8. `AI_CS_PLATFORM_ADAPTERS_ENABLED=true` 且对应平台 `*_ENABLED=true` 时,`*_INGRESS_SECRET` 不允许为空 +9. `AI_CS_PLATFORM_*_CALLBACK_TIMEOUT_MS` 在对应平台启用时必须为正数 +10. `AI_CS_PLATFORM_*_CALLBACK_MAX_RETRIES` 在对应平台启用时不允许为负数 --- @@ -86,6 +107,7 @@ - `DATABASE_URL` - `POSTGRES_*` - `WEBHOOK_SECRET` +- `AI_CS_PLATFORM_*` - `RATE_LIMIT_*` - `LOG_LEVEL` - `OPENAI_API_KEY` diff --git a/docs/RUNBOOK_PLATFORM_CALLBACKS.md b/docs/RUNBOOK_PLATFORM_CALLBACKS.md new file mode 100644 index 0000000..31c6266 --- /dev/null +++ b/docs/RUNBOOK_PLATFORM_CALLBACKS.md @@ -0,0 +1,107 @@ +# Platform Callback Runbook + +> 适用范围:`sub2api / newapi` 平台适配层的出站 callback 投递 +> 当前实现事实来源:`internal/store/postgres/platform_event_store.go`、`internal/service/platformdelivery/worker.go` + +--- + +## 1. 快速判断 + +平台回调链路分三层状态: + +1. **主链成功,outbox 已入库** + 表:`cs_platform_event_outbox` +2. **callback 尝试记录** + 表:`cs_platform_event_delivery_attempts` +3. **重试耗尽进入死信** + 表:`cs_platform_event_dead_letters` + +如果用户反馈“平台没收到回调”,先按这个顺序查,不要直接看应用日志猜。 + +--- + +## 2. 常用查询 + +### 2.1 查看待投递事件 + +```sql +SELECT id, platform, event_type, callback_target, status, attempt_count, next_attempt_at, last_error +FROM cs_platform_event_outbox +WHERE status IN ('pending', 'retrying') +ORDER BY next_attempt_at ASC, created_at ASC +LIMIT 100; +``` + +### 2.2 查看最近投递尝试 + +```sql +SELECT event_id, attempt_no, response_status, error_message, created_at +FROM cs_platform_event_delivery_attempts +ORDER BY created_at DESC +LIMIT 100; +``` + +### 2.3 查看死信事件 + +```sql +SELECT event_id, platform, event_type, callback_target, attempt_count, final_error, created_at +FROM cs_platform_event_dead_letters +ORDER BY created_at DESC +LIMIT 100; +``` + +--- + +## 3. 故障分类 + +### 3.1 平台回调失败 + +表现: +- `cs_platform_event_outbox.status` 为 `retrying` 或 `dead_letter` +- `cs_platform_event_delivery_attempts` 有记录 + +说明: +- 主链已经处理成功 +- 失败点在平台 callback 出站链路 + +### 3.2 主链失败 + +表现: +- 平台入口直接返回 `500` +- `cs_platform_event_outbox` 没有对应事件 + +说明: +- 失败点在 webhook 入站、dialog 主链或 outbox 写入 +- 这不属于 callback worker 故障 + +--- + +## 4. 手动重放 + +当前版本没有单独重放脚本,最小操作方式是把死信或重试事件改回可投递状态: + +```sql +UPDATE cs_platform_event_outbox +SET status = 'pending', + next_attempt_at = NOW(), + last_error = NULL, + updated_at = NOW() +WHERE id = ''; +``` + +如果事件已经在 `dead_letters`: + +```sql +DELETE FROM cs_platform_event_dead_letters +WHERE event_id = ''; +``` + +再等待 worker 下一轮拉取。 + +--- + +## 5. 处理原则 + +1. 不要手工删除 `outbox` 主记录,除非已经确认平台侧不需要这条事件。 +2. 优先保留 `delivery_attempts` 和 `dead_letters`,它们是排障证据。 +3. 如果同一平台持续大量 `retrying`,优先检查 callback 地址、签名 secret 和平台上游可用性。 diff --git a/docs/plans/2026-05-06-newapi-sub2api-adapter-design.md b/docs/plans/2026-05-06-newapi-sub2api-adapter-design.md new file mode 100644 index 0000000..11a406b --- /dev/null +++ b/docs/plans/2026-05-06-newapi-sub2api-adapter-design.md @@ -0,0 +1,332 @@ +# NewAPI / Sub2API 适配增强设计 + +> 日期:2026-05-06 +> 状态:设计稿 +> 适用项目:`projects/ai-customer-service` +> 设计边界:**最小接入层、内置适配器、入站 + 异步全事件流回写、Sub2API 优先、准可靠投递** + +--- + +## 1. 目标与边界 + +本设计解决的问题不是“把 `ai-customer-service` 做成另一个 NewAPI/Sub2API”,而是让它能够**稳定挂接在 NewAPI/Sub2API 后面,作为客服能力子系统运行**。当前代码已经具备 webhook、会话、意图、转人工、工单、审计、去重、PostgreSQL 落库、Gate B/Gate C 脚本化验证等底座,缺的是把外部平台原生消息接进来、再把内部处理结果以平台可消费的事件流回推出去的适配层。 + +第一版范围严格限制为: + +1. **Sub2API 优先**,NewAPI 保持同构兼容位,不追求双平台一次做满。 +2. **内置适配器**,不新增外部 shim 作为主路径。 +3. **入站适配**:把平台原生消息转换为 `UnifiedMessage` 并进入现有主链。 +4. **出站回写**:把内部处理结果、工单、错误、回调状态转成异步事件回推给上游平台。 +5. **准可靠投递**:事件持久化、重试、死信/补偿到位,但不追求复杂的跨系统 exactly-once。 + +明确不做的内容: + +1. 完整平台级管理后台 +2. 知识库共享 API 的全量产品化 +3. NewAPI/Sub2API 全量管理协议一比一兼容 +4. 任意平台原生结构透传 + +结论是:**第一版目标是“可稳定接入和可观测回推”,不是“完整兼容替代”。** + +--- + +## 2. 总体架构 + +推荐架构是在现有 HTTP 入口和对话主链之间插入一个**平台适配层(Platform Adapter Layer)**,并在主链处理完成后插入一个**事件出站层(Event Outbox + Delivery Layer)**。这样可以保持当前客服核心逻辑不被平台协议污染,同时把平台差异收口在边缘。 + +逻辑结构如下: + +```text +Sub2API / NewAPI + -> Platform Ingress Handler + -> Adapter Registry + -> Platform Adapter (normalize) + -> UnifiedMessage + -> dialog / intent / handoff / ticket / audit / dedup + -> Internal Domain Events + -> Event Outbox + -> Delivery Worker + -> Platform Callback Endpoint +``` + +核心原则: + +1. **核心主链不感知平台细节** + `dialog.Service` 继续只消费 `UnifiedMessage`,不直接理解 Sub2API/NewAPI 原生字段。 + +2. **适配逻辑边缘化** + 平台差异集中在 adapter 目录中,用接口抽象隔离。 + +3. **事件先落库再投递** + 所有异步回调事件进入 outbox 后再由 worker 重试发送,避免平台短时不可用导致结果丢失。 + +4. **同步 HTTP 只做最小确认** + 入站请求同步返回“收到并入链”的最小响应,不在主请求路径里等待整条回调链路完成。 + +这样做的收益是:现有 webhook 主链、Gate B/Gate C 验证、鉴权、工单状态机都可以复用,不需要重写核心业务。 + +--- + +## 3. 入站适配设计 + +第一版入站适配增加一个新的入口族,而不是强行把平台原生大包塞进现有 `UnifiedMessage` handler。建议新增: + +```text +POST /api/v1/customer-service/platforms/{platform}/webhook +POST /api/v1/customer-service/platforms/{platform}/webhook/{channel} +``` + +其中 `{platform}` 第一版支持: + +1. `sub2api` +2. `newapi`(保留同构位,可先实现最小 profile) + +当前状态补充: +- `sub2api` 已完成第一版最小接入、outbox、callback worker、dead letter 和 E2E 验证 +- `newapi` 当前仅保留同构 adapter profile,占位返回 `501 profile not implemented` + +新增接口: + +```go +type PlatformAdapter interface { + Platform() string + ParseInbound(*http.Request, []byte, IngressContext) (*message.UnifiedMessage, *PlatformInboundMeta, error) + BuildIngressAck(*dialog.Result, *PlatformInboundMeta) any +} +``` + +设计要点: + +1. **平台原生请求体不再直接喂给现有 webhook handler** + 先在 adapter 里裁剪、校验、映射,再构造 `UnifiedMessage`。 + +2. **保留平台元数据** + `PlatformInboundMeta` 记录: + - platform + - tenant / app / upstream endpoint + - raw event id + - callback target + - callback auth profile + - source user/session ids + +3. **统一进入现有主链** + Adapter 输出只允许是干净的 `UnifiedMessage`,这样 `dialog.Service`、dedup、ticket、audit 无需大改。 + +4. **同步确认最小化** + 入站 HTTP 响应只表达: + - `accepted` + - `event_id` + - `session_id`(如果已生成) + 不承担完整业务结果回写职责。 + +Sub2API 优先意味着第一版先针对 tksea 场景定义一个明确的 inbound profile,而不是试图抽象所有平台差异。 + +--- + +## 4. 出站全事件流设计 + +你明确要求第一版不是只回最终结果,而是做**全事件流异步回调**。这意味着需要在内部定义一个稳定的事件模型,而不是拿日志拼 webhook。 + +建议的事件类型: + +1. `message.received` +2. `message.rejected` +3. `message.deduplicated` +4. `message.processing` +5. `intent.resolved` +6. `handoff.triggered` +7. `ticket.created` +8. `ticket.assigned` +9. `ticket.resolved` +10. `ticket.closed` +11. `reply.generated` +12. `callback.delivered` +13. `callback.failed` + +事件统一结构建议: + +```json +{ + "event_id": "uuid", + "event_type": "reply.generated", + "platform": "sub2api", + "occurred_at": "2026-05-06T12:00:00Z", + "session_id": "uuid", + "ticket_id": "uuid", + "source_message_id": "platform-msg-id", + "attempt": 1, + "payload": {} +} +``` + +关键设计点: + +1. **事件类型稳定、字段尽量固定** +2. **事件 payload 面向平台消费,而不是内部 debug** +3. **每条事件必须有 `event_id` 供下游幂等** +4. **reply / handoff / ticket 是关键事件,必须可补偿重放** + +这样第一版虽然不是完整平台集成,但已经具备后续扩展到状态同步、工单联动和运营侧诊断的事件基础。 + +--- + +## 5. 准可靠投递设计 + +你选择的是“准可靠投递”,这决定了我们不能把异步回调只做成 best-effort。推荐实现是**Outbox + Delivery Worker + Retry Policy + Dead Letter**。 + +新增持久化表建议: + +1. `cs_platform_callbacks` + - 配置每个 platform target 的回调地址、签名方式、启停状态 +2. `cs_platform_event_outbox` + - 存放待投递事件 +3. `cs_platform_event_delivery_attempts` + - 存放每次尝试结果 +4. `cs_platform_event_dead_letters` + - 存放超出重试上限的事件 + +投递策略: + +1. 业务主链中先生成事件并落 `outbox` +2. 后台 worker 轮询领取事件 +3. 成功后标记 delivered +4. 失败后指数退避重试 +5. 达到上限后进入 dead letter +6. 提供人工或脚本重放入口 + +推荐默认策略: + +1. 首次立即投递 +2. 之后 `10s / 30s / 60s / 5m / 15m` +3. 最多 5 次 +4. 超过进入 dead letter + +这不是严格 exactly-once,但对第一版已经足够现实: + +- 上游通过 `event_id` 幂等 +- 我们保证“不轻易丢” +- 重试/死信让失败可追踪可恢复 + +--- + +## 6. 配置与安全设计 + +适配层要想落地,配置必须从“单 webhook secret”提升为“平台适配配置”。建议新增: + +```text +AI_CS_PLATFORM_ADAPTERS_ENABLED=true +AI_CS_PLATFORM_SUB2API_ENABLED=true +AI_CS_PLATFORM_SUB2API_INGRESS_SECRET=... +AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL=... +AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET=... +AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS=3000 +AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES=5 +AI_CS_PLATFORM_NEWAPI_ENABLED=false +``` + +安全要求: + +1. **入站鉴权** + 平台入口不能复用当前通用 webhook 约束的最小集合就草率上线,必须明确平台级 secret/profile。 + +2. **出站签名** + 回调给 Sub2API/NewAPI 的事件也要带时间戳与签名,避免被伪造。 + +3. **最小字段原则** + 只回推平台真正需要的字段,不把完整上下文、敏感用户数据默认外发。 + +4. **审计闭环** + 所有 callback 失败、重试、死信、重放都进入 `audit` 或独立 delivery attempts 表。 + +安全上最重要的一条是: + +> **平台适配层必须是“显式启用、显式配置、显式审计”的能力,不允许默认裸开。** + +--- + +## 7. 测试与门禁设计 + +第一版适配增强必须新增独立测试层,而不能只靠现有 webhook 测试顺带覆盖。 + +建议测试分层: + +1. **Unit** + - 平台原生 payload -> `UnifiedMessage` 映射 + - callback payload 组装 + - 签名算法 + - 重试策略 + +2. **Integration** + - 平台入站请求 -> 主链处理 -> outbox 落库 + - outbox -> callback mock server + - 失败重试 -> dead letter + +3. **E2E** + - Sub2API mock 发原生消息 + - `ai-customer-service` 创建 session / ticket / audit + - callback mock 收到全事件流 + +第一版阻断门禁建议至少包含: + +1. `sub2api` 最小接入 happy path +2. `message_id` 去重 path +3. 未知字段/非法签名 path +4. callback 5xx 重试 path +5. callback 最终 dead letter path +6. 回滚后 callback 恢复 path + +这里要特别强调: + +> 当前 `tech/TEST_DESIGN.md` 里 NewAPI/Sub2API 适配验证还是待实现项,第一版增强后必须把它提升为真正可执行的合同测试和联调测试,而不是继续停留在文档层。 + +--- + +## 8. 分阶段实施建议 + +为了不把当前 Phase 1 拖爆,建议按 3 个 implementation batch 执行: + +### Batch 1:Sub2API 入站最小适配 + +1. 新增 `/platforms/sub2api/webhook` +2. 新增 adapter 接口和 `sub2api` profile +3. 原生 payload -> `UnifiedMessage` +4. 复用现有主链 +5. 单测 + 集成测试 + +### Batch 2:事件 outbox 与异步回调 + +1. 设计事件模型 +2. 新增 outbox 表 +3. 新增 worker +4. 新增 callback 签名与投递 +5. 失败重试 + dead letter + +### Batch 3:NewAPI profile 与运维可观测 + +1. 新增 `newapi` adapter profile +2. 新增 delivery metrics / dashboard +3. 新增重放工具与 runbook +4. 补 Gate B / Gate C 适配层联调门禁 + +这个顺序的理由很简单: + +1. 先把 Sub2API 场景跑通 +2. 再把异步事件流做稳 +3. 最后复用同一套抽象支持 NewAPI + +--- + +## 9. 最终建议 + +我推荐按这份设计推进,因为它满足四个约束: + +1. **符合项目规划**:确实开始支持 NewAPI/Sub2API +2. **不破坏当前主链**:平台差异不侵入核心客服逻辑 +3. **可先解决 tksea / Sub2API 的真实问题**:不是空转设计 +4. **可灰度实施**:Batch 1 完成就能先验证最小接入 + +最终建议一句话概括: + +> **把 NewAPI/Sub2API 支持做成“内置适配器 + 事件 outbox”的最小集成层,而不是把 `ai-customer-service` 重做成另一个平台。** + +下一步如果继续,最合理的是直接基于这份设计拆 implementation plan,而不是直接开写代码。 diff --git a/docs/plans/2026-05-06-newapi-sub2api-adapter-implementation-plan.md b/docs/plans/2026-05-06-newapi-sub2api-adapter-implementation-plan.md new file mode 100644 index 0000000..c9f29da --- /dev/null +++ b/docs/plans/2026-05-06-newapi-sub2api-adapter-implementation-plan.md @@ -0,0 +1,754 @@ +# NewAPI / Sub2API Adapter Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 为 `ai-customer-service` 增加面向 `Sub2API` 优先、`NewAPI` 同构兼容的最小平台适配层,支持入站原生消息适配、异步全事件流回写,以及准可靠投递。 + +**Architecture:** 在现有统一 webhook 主链之外新增平台入口 `/platforms/{platform}/webhook`,通过内置 adapter 将平台原生 payload 转换为 `UnifiedMessage`。主链处理后生成内部平台事件,先落库到 outbox,再由后台 worker 进行带重试的异步 callback 投递。 + +**Tech Stack:** Go 1.22, net/http, PostgreSQL, HMAC-SHA256, background worker, Go test, httptest + +--- + +## 0. 实施原则 + +1. **先 Sub2API,后 NewAPI** + 第一批只要求 Sub2API 真正可跑,NewAPI 只保留 profile 插槽和最小合同测试骨架。 + +2. **先入站,后出站,最后可靠性** + 先打通平台入站 -> 主链,再接 outbox + callback,再补 dead letter / replay。 + +3. **适配逻辑边缘化** + 不改 `dialog.Service` 的核心业务语义;平台差异收在 adapter / callback / outbox 层。 + +4. **TDD + 频繁提交** + 每个 Task 都先写失败测试,再写最小实现,再跑验证,再提交。 + +--- + +### Task 1: 搭好平台适配骨架与路由入口 + +**Files:** +- Create: `internal/platformadapter/types.go` +- Create: `internal/platformadapter/registry.go` +- Create: `internal/platformadapter/sub2api_adapter.go` +- Create: `internal/platformadapter/newapi_adapter.go` +- Create: `internal/http/handlers/platform_webhook_handler.go` +- Modify: `internal/http/router.go` +- Test: `internal/platformadapter/registry_test.go` +- Test: `internal/http/handlers/platform_webhook_handler_test.go` + +**Step 1: 写平台注册表失败测试** + +写测试覆盖: + +```go +func TestRegistry_ShouldResolveSub2APIAdapter(t *testing.T) {} +func TestRegistry_ShouldRejectUnknownPlatform(t *testing.T) {} +``` + +**Step 2: 运行测试确认失败** + +Run: + +```bash +go test ./internal/platformadapter ./internal/http/handlers -count=1 +``` + +Expected: +- FAIL,提示 `platformadapter` 包或 handler 不存在 + +**Step 3: 写最小平台类型与注册表** + +新增: + +- `PlatformAdapter` 接口 +- `IngressContext` +- `PlatformInboundMeta` +- `Registry` + +最小接口: + +```go +type PlatformAdapter interface { + Platform() string + ParseInbound(r *http.Request, body []byte, ctx IngressContext) (*message.UnifiedMessage, *PlatformInboundMeta, error) + BuildIngressAck(result *dialog.Result, meta *PlatformInboundMeta) any +} +``` + +**Step 4: 写最小 handler 骨架** + +`PlatformWebhookHandler` 先只做: + +1. 路径读取 `{platform}` / `{channel}` +2. 从 registry 取 adapter +3. 读取 body +4. 调 adapter +5. 调现有 `dialog.Service` +6. 返回 adapter ack + +**Step 5: 在 router 增加入口** + +新增: + +- `POST /api/v1/customer-service/platforms/{platform}/webhook` +- `POST /api/v1/customer-service/platforms/{platform}/webhook/{channel}` + +**Step 6: 跑测试确认通过** + +Run: + +```bash +go test ./internal/platformadapter ./internal/http/handlers -count=1 +``` + +Expected: +- PASS + +**Step 7: Commit** + +```bash +git add internal/platformadapter internal/http/handlers/platform_webhook_handler.go internal/http/handlers/platform_webhook_handler_test.go internal/http/router.go +git commit -m "feat(adapter): add platform webhook adapter skeleton" +``` + +--- + +### Task 2: 实现 Sub2API 入站最小适配 + +**Files:** +- Modify: `internal/platformadapter/sub2api_adapter.go` +- Create: `internal/platformadapter/sub2api_types.go` +- Test: `internal/platformadapter/sub2api_adapter_test.go` +- Modify: `internal/http/handlers/platform_webhook_handler_test.go` +- Reference: `docs/SUB2API_MINIMAL_WEBHOOK_MAPPING.md` + +**Step 1: 写 Sub2API payload 失败测试** + +覆盖: + +```go +func TestSub2APIAdapter_ShouldMapMinimalPayload(t *testing.T) {} +func TestSub2APIAdapter_ShouldRejectUnknownEnvelopeFields(t *testing.T) {} +func TestSub2APIAdapter_ShouldUseChannelOverrideWhenPresent(t *testing.T) {} +func TestSub2APIAdapter_ShouldRequireOpenIDAndContent(t *testing.T) {} +``` + +**Step 2: 运行测试确认失败** + +Run: + +```bash +go test ./internal/platformadapter -count=1 +``` + +Expected: +- FAIL,字段映射或校验未实现 + +**Step 3: 定义 Sub2API 最小 payload 结构** + +只实现第一版所需字段: + +```go +type Sub2APIInboundPayload struct { + MessageID string `json:"message_id"` + Channel string `json:"channel"` + OpenID string `json:"open_id"` + UserID string `json:"user_id,omitempty"` + Content string `json:"content"` + Timestamp time.Time `json:"timestamp,omitempty"` + ReplyTo string `json:"reply_to,omitempty"` +} +``` + +不要一次性吞平台原生大包。 + +**Step 4: 实现最小 ParseInbound** + +规则: + +1. 只接受当前最小字段 +2. 缺 `channel/open_id/content` 返回 `400` +3. `{channel}` path override 优先 +4. 产出 `UnifiedMessage` +5. 记录 `PlatformInboundMeta` + +**Step 5: 实现最小 ingress ack** + +同步响应先返回: + +```json +{ + "accepted": true, + "platform": "sub2api", + "session_id": "...", + "ticket_id": "...", + "event_id": "..." +} +``` + +**Step 6: 跑测试确认通过** + +Run: + +```bash +go test ./internal/platformadapter ./internal/http/handlers -count=1 +``` + +Expected: +- PASS + +**Step 7: Commit** + +```bash +git add internal/platformadapter/sub2api_adapter.go internal/platformadapter/sub2api_types.go internal/platformadapter/sub2api_adapter_test.go internal/http/handlers/platform_webhook_handler_test.go +git commit -m "feat(adapter): add sub2api inbound adapter" +``` + +--- + +### Task 3: 增加平台级入站鉴权配置 + +**Files:** +- Modify: `internal/config/config.go` +- Modify: `internal/config/config_test.go` +- Create: `internal/http/handlers/platform_webhook_security.go` +- Test: `internal/http/handlers/platform_webhook_security_test.go` +- Modify: `internal/http/router.go` +- Modify: `docs/CONFIG_CONTRACT_BASELINE.md` + +**Step 1: 先写配置失败测试** + +覆盖: + +```go +func TestPlatformAdapterConfig_ShouldFailInProdWhenSub2APIEnabledWithoutIngressSecret(t *testing.T) {} +func TestPlatformAdapterConfig_ShouldPassWhenAdaptersDisabled(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/config ./internal/http/handlers -count=1 +``` + +Expected: +- FAIL + +**Step 3: 增加最小平台适配配置** + +新增配置项: + +- `AI_CS_PLATFORM_ADAPTERS_ENABLED` +- `AI_CS_PLATFORM_SUB2API_ENABLED` +- `AI_CS_PLATFORM_SUB2API_INGRESS_SECRET` +- `AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL` +- `AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET` +- `AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS` +- `AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES` +- `AI_CS_PLATFORM_NEWAPI_ENABLED` + +**Step 4: 写平台入口安全包装器** + +实现与现有 `WebhookSecurity` 同构的: + +- `PlatformWebhookSecurity` + +但按 platform profile 选择 secret,不要复用通用 webhook secret。 + +**Step 5: 在 router 给平台入口接安全包装** + +平台入口独立挂安全中间件,不与现有 `/webhook` 混用 secret。 + +**Step 6: 跑测试确认通过** + +Run: + +```bash +go test ./internal/config ./internal/http/handlers -count=1 +``` + +Expected: +- PASS + +**Step 7: Commit** + +```bash +git add internal/config/config.go internal/config/config_test.go internal/http/handlers/platform_webhook_security.go internal/http/handlers/platform_webhook_security_test.go internal/http/router.go docs/CONFIG_CONTRACT_BASELINE.md +git commit -m "feat(adapter): add platform-specific ingress security config" +``` + +--- + +### Task 4: 定义平台事件模型与 outbox 表结构 + +**Files:** +- Create: `db/migration/0002_platform_event_outbox.up.sql` +- Create: `internal/domain/platformevent/event.go` +- Create: `internal/domain/platformevent/event_test.go` +- Create: `internal/store/postgres/platform_event_store.go` +- Create: `internal/store/postgres/platform_event_store_test.go` +- Reference: `docs/plans/2026-05-06-newapi-sub2api-adapter-design.md` + +**Step 1: 写 store 失败测试** + +覆盖: + +```go +func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) {} +func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/store/postgres -count=1 +``` + +Expected: +- FAIL + +**Step 3: 定义事件模型** + +新增 `platformevent.Event`: + +- `ID` +- `Platform` +- `EventType` +- `SessionID` +- `TicketID` +- `SourceMessageID` +- `CallbackTarget` +- `Payload` +- `Status` +- `AttemptCount` +- `NextAttemptAt` +- `CreatedAt` + +**Step 4: 补 migration** + +建表至少包括: + +1. `cs_platform_callbacks` +2. `cs_platform_event_outbox` +3. `cs_platform_event_delivery_attempts` +4. `cs_platform_event_dead_letters` + +第一版不做过度 schema 拆分,优先让 outbox 可用。 + +**Step 5: 实现最小 Postgres store** + +支持: + +1. 插入 pending event +2. 拉取 due events +3. 标记 delivered +4. 标记 retry +5. 标记 dead letter + +**Step 6: 跑测试确认通过** + +Run: + +```bash +go test ./internal/domain/platformevent ./internal/store/postgres -count=1 +``` + +Expected: +- PASS + +**Step 7: Commit** + +```bash +git add db/migration/0002_platform_event_outbox.up.sql internal/domain/platformevent internal/store/postgres/platform_event_store.go internal/store/postgres/platform_event_store_test.go +git commit -m "feat(adapter): add platform event outbox schema and store" +``` + +--- + +### Task 5: 在主链接入平台事件生成 + +**Files:** +- Modify: `internal/service/dialog/service.go` +- Create: `internal/service/platformevents/builder.go` +- Create: `internal/service/platformevents/builder_test.go` +- Modify: `internal/http/handlers/platform_webhook_handler.go` +- Modify: `internal/http/handlers/platform_webhook_handler_test.go` + +**Step 1: 写失败测试** + +覆盖: + +```go +func TestPlatformWebhookHandler_ShouldEnqueueMessageReceivedAndReplyGenerated(t *testing.T) {} +func TestPlatformWebhookHandler_ShouldEnqueueHandoffAndTicketCreatedWhenNeeded(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/service/... ./internal/http/handlers -count=1 +``` + +Expected: +- FAIL + +**Step 3: 新增事件构建器** + +从 `dialog.Result + PlatformInboundMeta` 构建: + +1. `message.received` +2. `message.processing` +3. `intent.resolved` +4. `handoff.triggered` +5. `ticket.created` +6. `reply.generated` + +**Step 4: 在平台 handler 中落 outbox** + +当前平台入口成功后: + +1. 先调主链 +2. 再构建事件 +3. 批量写入 outbox +4. 返回 ingress ack + +第一版不要把 outbox 失败静默吞掉;应返回 `500` 并记录日志/审计。 + +**Step 5: 跑测试确认通过** + +Run: + +```bash +go test ./internal/service/... ./internal/http/handlers -count=1 +``` + +Expected: +- PASS + +**Step 6: Commit** + +```bash +git add internal/service/platformevents internal/service/dialog/service.go internal/http/handlers/platform_webhook_handler.go internal/http/handlers/platform_webhook_handler_test.go +git commit -m "feat(adapter): enqueue platform outbox events from inbound flow" +``` + +--- + +### Task 6: 实现 callback 投递 worker + +**Files:** +- Create: `internal/service/platformdelivery/worker.go` +- Create: `internal/service/platformdelivery/signer.go` +- Create: `internal/service/platformdelivery/worker_test.go` +- Create: `internal/service/platformdelivery/signer_test.go` +- Modify: `internal/app/app.go` +- Modify: `internal/config/config.go` + +**Step 1: 写失败测试** + +覆盖: + +```go +func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) {} +func TestWorker_ShouldRetryWhenCallbackReturns5xx(t *testing.T) {} +func TestSigner_ShouldProduceStableTimestampAndSignatureHeaders(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/service/platformdelivery -count=1 +``` + +Expected: +- FAIL + +**Step 3: 实现 callback signer** + +为出站事件添加: + +- `X-CS-Timestamp` +- `X-CS-Signature` + +算法与平台 callback secret 对齐。 + +**Step 4: 实现最小 worker** + +职责: + +1. 拉取 due events +2. 发送 callback +3. 成功标记 delivered +4. 失败按退避设置 `next_attempt_at` + +**Step 5: 在 app 启动 worker** + +只在: + +- `AI_CS_PLATFORM_ADAPTERS_ENABLED=true` + +时启动。 + +**Step 6: 跑测试确认通过** + +Run: + +```bash +go test ./internal/service/platformdelivery ./internal/app -count=1 +``` + +Expected: +- PASS + +**Step 7: Commit** + +```bash +git add internal/service/platformdelivery internal/app/app.go internal/config/config.go +git commit -m "feat(adapter): add platform callback delivery worker" +``` + +--- + +### Task 7: 增加重试、死信和投递尝试审计 + +**Files:** +- Modify: `internal/store/postgres/platform_event_store.go` +- Modify: `internal/store/postgres/platform_event_store_test.go` +- Modify: `internal/service/platformdelivery/worker.go` +- Modify: `internal/service/platformdelivery/worker_test.go` +- Create: `docs/RUNBOOK_PLATFORM_CALLBACKS.md` + +**Step 1: 写失败测试** + +覆盖: + +```go +func TestWorker_ShouldMoveEventToDeadLetterAfterMaxRetries(t *testing.T) {} +func TestWorker_ShouldPersistDeliveryAttemptAudit(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/store/postgres ./internal/service/platformdelivery -count=1 +``` + +Expected: +- FAIL + +**Step 3: 实现尝试记录与死信** + +要求: + +1. 每次 callback 尝试都写 `delivery_attempts` +2. 达到最大次数写 `dead_letters` +3. outbox 主记录进入 terminal status + +**Step 4: 补运行手册** + +新增 runbook 说明: + +1. 如何查看 pending / failed / dead letter +2. 如何手动重放 +3. 如何区分平台回调失败与主链失败 + +**Step 5: 跑测试确认通过** + +Run: + +```bash +go test ./internal/store/postgres ./internal/service/platformdelivery -count=1 +``` + +Expected: +- PASS + +**Step 6: Commit** + +```bash +git add internal/store/postgres/platform_event_store.go internal/store/postgres/platform_event_store_test.go internal/service/platformdelivery/worker.go internal/service/platformdelivery/worker_test.go docs/RUNBOOK_PLATFORM_CALLBACKS.md +git commit -m "feat(adapter): add callback retry audit and dead letter handling" +``` + +--- + +### Task 8: 新增端到端 Sub2API 接入测试 + +**Files:** +- Create: `test/integration/sub2api_webhook_flow_test.go` +- Create: `test/e2e/sub2api_callback_flow_test.go` +- Modify: `tech/TEST_DESIGN.md` +- Modify: `test/QA_GATE_STATUS.md` + +**Step 1: 写端到端失败测试** + +覆盖: + +```go +func TestSub2APIWebhookFlow_ShouldCreateSessionTicketAndOutboxEvents(t *testing.T) {} +func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) {} +func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./test/integration ./test/e2e -count=1 +``` + +Expected: +- FAIL + +**Step 3: 接通测试依赖** + +1. 使用 mock callback server +2. 使用 Postgres 测试库 +3. 走真实平台入口 `/platforms/sub2api/webhook` +4. 验证 outbox / delivery / dead letter + +**Step 4: 更新测试设计与 QA 文档** + +把原来“NewAPI/Sub2API 适配层验证待实现”改成: + +1. 已有 Sub2API 最小接入联调测试 +2. NewAPI 同构位待实现 + +**Step 5: 跑测试确认通过** + +Run: + +```bash +go test ./test/integration ./test/e2e -count=1 +go test ./... -count=1 +``` + +Expected: +- PASS + +**Step 6: Commit** + +```bash +git add test/integration/sub2api_webhook_flow_test.go test/e2e/sub2api_callback_flow_test.go tech/TEST_DESIGN.md test/QA_GATE_STATUS.md +git commit -m "test(adapter): add sub2api end-to-end adapter coverage" +``` + +--- + +### Task 9: 预留 NewAPI profile 与适配扩展点 + +**Files:** +- Modify: `internal/platformadapter/newapi_adapter.go` +- Create: `internal/platformadapter/newapi_adapter_test.go` +- Modify: `docs/plans/2026-05-06-newapi-sub2api-adapter-design.md` + +**Step 1: 写最小失败测试** + +覆盖: + +```go +func TestNewAPIAdapter_ShouldBeRegisteredButDisabledByDefault(t *testing.T) {} +``` + +**Step 2: 跑测试确认失败** + +Run: + +```bash +go test ./internal/platformadapter -count=1 +``` + +Expected: +- FAIL + +**Step 3: 实现同构占位** + +要求: + +1. registry 中可注册 `newapi` +2. 默认不开启 +3. 明确返回“profile not implemented”而不是 silent success + +**Step 4: 跑测试确认通过** + +Run: + +```bash +go test ./internal/platformadapter -count=1 +``` + +Expected: +- PASS + +**Step 5: Commit** + +```bash +git add internal/platformadapter/newapi_adapter.go internal/platformadapter/newapi_adapter_test.go docs/plans/2026-05-06-newapi-sub2api-adapter-design.md +git commit -m "feat(adapter): reserve newapi adapter profile extension point" +``` + +--- + +## 最终整体验证 + +所有 Task 完成后必须执行: + +```bash +go test ./... -count=1 +go test -race ./... +go vet ./... +bash -n scripts/verify_preprod_gate_b.sh +bash -n scripts/verify_gate_c_rollback.sh +``` + +如果新增了平台脚本,再追加: + +```bash +bash scripts/verify_platform_adapter_sub2api.sh +``` + +Expected: +- 全部 PASS + +--- + +## 交付完成判定 + +满足以下条件才算第一版完成: + +1. `sub2api` 平台入口可用 +2. 原生 payload 可映射到 `UnifiedMessage` +3. 成功创建 session / ticket / audit / dedup +4. 全事件流可进入 outbox +5. callback worker 可投递、重试、死信 +6. 端到端测试通过 +7. QA 文档与 runbook 已更新 + +--- + +## 风险提醒 + +1. **不要一次性做完整平台协议** + 第一版只做 Sub2API 优先的最小 profile。 + +2. **不要把平台字段渗透进核心主链** + 平台差异只能留在 adapter/meta/event 边缘层。 + +3. **不要跳过 outbox 直接同步回调** + 你已经要求准可靠投递,不能退回 best-effort。 + +4. **不要省掉 dead letter** + 没有 dead letter,就没有真正的可恢复性闭环。 diff --git a/internal/app/app.go b/internal/app/app.go index 589939f..fb5a409 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -14,9 +14,11 @@ import ( "github.com/bridge/ai-customer-service/internal/http/handlers" "github.com/bridge/ai-customer-service/internal/platform/health" "github.com/bridge/ai-customer-service/internal/platform/httpx" + "github.com/bridge/ai-customer-service/internal/platformadapter" "github.com/bridge/ai-customer-service/internal/service/dialog" "github.com/bridge/ai-customer-service/internal/service/handoff" intentservice "github.com/bridge/ai-customer-service/internal/service/intent" + "github.com/bridge/ai-customer-service/internal/service/platformdelivery" "github.com/bridge/ai-customer-service/internal/service/reply" memoryStore "github.com/bridge/ai-customer-service/internal/store/memory" pgstore "github.com/bridge/ai-customer-service/internal/store/postgres" @@ -52,9 +54,11 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) { audits dialog.AuditRepository tickets dialog.TicketRepository dedup dialog.DedupRepository + platformEvents *pgstore.PlatformEventStore ticketService handlers.TicketService checkers []health.Checker closers []func() error + workerClosers []func() error ticketListerStore ticketLister sessionStore dialog.SessionRepository ticketStore dialog.TicketRepository @@ -75,6 +79,7 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) { auditStore := pgstore.NewAuditStore(db) ticketStore := pgstore.NewTicketStore(db) dedupStore := pgstore.NewDedupStore(db) + platformEvents = pgstore.NewPlatformEventStore(db) sessions = sessionStore audits = auditStore tickets = ticketStore @@ -111,7 +116,74 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) { ticketStatsHandler := handlers.NewTicketStatsHandler(ticketListerStore, audits) sessionHandler := handlers.NewSessionHandler(sessionStore, ticketStore, audits) webhookSecurity := handlers.WebhookSecurity{Secret: cfg.Webhook.Secret, TimestampHeader: cfg.Webhook.TimestampHeader, SignatureHeader: cfg.Webhook.SignatureHeader, MaxSkew: time.Duration(cfg.Webhook.MaxSkewSeconds) * time.Second, Audit: audits} - router := httpserver.NewRouter(httpserver.RouterDeps{Health: healthHandler, Webhook: webhookHandler, Tickets: ticketHandler, TicketStats: ticketStatsHandler, Sessions: sessionHandler, WebhookAuth: webhookSecurity, MaxBodyBytes: cfg.HTTP.MaxBodyBytes, RateLimiter: rateLimiter}) + + var ( + platformWebhookHandler *handlers.PlatformWebhookHandler + platformWebhookAuth handlers.PlatformWebhookSecurity + ) + if cfg.PlatformAdapters.Enabled { + var adapters []platformadapter.PlatformAdapter + if cfg.PlatformAdapters.Sub2API.Enabled { + adapters = append(adapters, platformadapter.NewSub2APIAdapter()) + } + if cfg.PlatformAdapters.NewAPI.Enabled { + adapters = append(adapters, platformadapter.NewNewAPIAdapter()) + } + if len(adapters) > 0 { + platformWebhookHandler = handlers.NewPlatformWebhookHandler(dialogSvc, platformadapter.NewRegistry(adapters...), platformEvents) + platformWebhookAuth = handlers.PlatformWebhookSecurity{ + TimestampHeader: cfg.Webhook.TimestampHeader, + SignatureHeader: cfg.Webhook.SignatureHeader, + MaxSkew: time.Duration(cfg.Webhook.MaxSkewSeconds) * time.Second, + Audit: audits, + Sub2APISecret: cfg.PlatformAdapters.Sub2API.IngressSecret, + NewAPISecret: cfg.PlatformAdapters.NewAPI.IngressSecret, + } + } + } + + router := httpserver.NewRouter(httpserver.RouterDeps{ + Health: healthHandler, + Webhook: webhookHandler, + PlatformWebhook: platformWebhookHandler, + PlatformWebhookAuth: platformWebhookAuth, + Tickets: ticketHandler, + TicketStats: ticketStatsHandler, + Sessions: sessionHandler, + WebhookAuth: webhookSecurity, + MaxBodyBytes: cfg.HTTP.MaxBodyBytes, + RateLimiter: rateLimiter, + }) + + if cfg.PlatformAdapters.Enabled && platformEvents != nil { + startWorker := func(platform string, profile config.PlatformAdapterProfileConfig) { + if !profile.Enabled || profile.CallbackBaseURL == "" || profile.CallbackSecret == "" { + return + } + workerCtx, cancel := context.WithCancel(context.Background()) + workerClosers = append(workerClosers, func() error { + cancel() + return nil + }) + worker := platformdelivery.NewWorker( + platform, + profile.CallbackBaseURL, + platformEvents, + &http.Client{Timeout: time.Duration(profile.CallbackTimeoutMS) * time.Millisecond}, + platformdelivery.Signer{ + Secret: profile.CallbackSecret, + TimestampHeader: cfg.Webhook.TimestampHeader, + SignatureHeader: cfg.Webhook.SignatureHeader, + }, + profile.CallbackMaxRetries, + ) + worker.Logger = logger + go worker.Start(workerCtx) + } + startWorker("sub2api", cfg.PlatformAdapters.Sub2API) + startWorker("newapi", cfg.PlatformAdapters.NewAPI) + } + closers = append(workerClosers, closers...) return &App{ Server: &http.Server{ diff --git a/internal/app/app_test.go b/internal/app/app_test.go index 48dfe82..af92cc4 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -104,6 +104,26 @@ func TestNew_AllowsMemoryModeInTestEnv(t *testing.T) { } } +func TestNew_RegistersPlatformWebhookRouteWhenSub2APIEnabled(t *testing.T) { + cfg := minimalHTTPConfig() + cfg.Webhook.Secret = "test-secret" + cfg.PlatformAdapters.Enabled = true + cfg.PlatformAdapters.Sub2API.Enabled = true + cfg.PlatformAdapters.Sub2API.IngressSecret = "sub2api-secret" + + app, err := New(cfg, logging.New()) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + + req := httptest.NewRequest(http.MethodGet, "/api/v1/customer-service/platforms/sub2api/webhook", nil) + rr := httptest.NewRecorder() + app.Server.Handler.ServeHTTP(rr, req) + if rr.Code == http.StatusNotFound { + t.Fatal("platform webhook route returned 404; route should be registered") + } +} + func TestApp_TicketStore(t *testing.T) { cfg := minimalHTTPConfig() cfg.Webhook.Secret = "test-secret" diff --git a/internal/config/config.go b/internal/config/config.go index b5c82d8..5e14cce 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,10 +8,11 @@ import ( ) type Config struct { - HTTP HTTPConfig - Postgres PostgresConfig - Webhook WebhookConfig - Runtime RuntimeConfig + HTTP HTTPConfig + Postgres PostgresConfig + Webhook WebhookConfig + PlatformAdapters PlatformAdaptersConfig + Runtime RuntimeConfig } type RuntimeConfig struct { @@ -44,6 +45,21 @@ type WebhookConfig struct { MaxSkewSeconds int } +type PlatformAdaptersConfig struct { + Enabled bool + Sub2API PlatformAdapterProfileConfig + NewAPI PlatformAdapterProfileConfig +} + +type PlatformAdapterProfileConfig struct { + Enabled bool + IngressSecret string + CallbackBaseURL string + CallbackSecret string + CallbackTimeoutMS int + CallbackMaxRetries int +} + func Load() (*Config, error) { cfg := &Config{ HTTP: HTTPConfig{ @@ -69,6 +85,25 @@ func Load() (*Config, error) { SignatureHeader: getEnv("AI_CS_WEBHOOK_SIGNATURE_HEADER", "X-CS-Signature"), MaxSkewSeconds: getEnvInt("AI_CS_WEBHOOK_MAX_SKEW_SECONDS", 300), }, + PlatformAdapters: PlatformAdaptersConfig{ + Enabled: getEnvBool("AI_CS_PLATFORM_ADAPTERS_ENABLED", false), + Sub2API: PlatformAdapterProfileConfig{ + Enabled: getEnvBool("AI_CS_PLATFORM_SUB2API_ENABLED", false), + IngressSecret: getEnv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", ""), + CallbackBaseURL: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL", ""), + CallbackSecret: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET", ""), + CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS", 3000), + CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES", 5), + }, + NewAPI: PlatformAdapterProfileConfig{ + Enabled: getEnvBool("AI_CS_PLATFORM_NEWAPI_ENABLED", false), + IngressSecret: getEnv("AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET", ""), + CallbackBaseURL: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL", ""), + CallbackSecret: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET", ""), + CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS", 3000), + CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES", 5), + }, + }, Runtime: RuntimeConfig{ Env: normalizeRuntimeEnv(getEnv("AI_CS_RUNTIME_ENV", getEnv("AI_CS_ENV", "development"))), }, @@ -85,6 +120,12 @@ func Load() (*Config, error) { if cfg.Webhook.MaxSkewSeconds <= 0 { return nil, fmt.Errorf("AI_CS_WEBHOOK_MAX_SKEW_SECONDS must be positive") } + if err := validatePlatformProfile("sub2api", cfg.PlatformAdapters.Enabled, cfg.PlatformAdapters.Sub2API); err != nil { + return nil, err + } + if err := validatePlatformProfile("newapi", cfg.PlatformAdapters.Enabled, cfg.PlatformAdapters.NewAPI); err != nil { + return nil, err + } if cfg.Runtime.Env != "production" && cfg.Runtime.Env != "development" && cfg.Runtime.Env != "test" { return nil, fmt.Errorf("AI_CS_RUNTIME_ENV must be one of production/development/test, got: %s", cfg.Runtime.Env) } @@ -97,6 +138,23 @@ func Load() (*Config, error) { return cfg, nil } +func validatePlatformProfile(platform string, adaptersEnabled bool, profile PlatformAdapterProfileConfig) error { + if !adaptersEnabled || !profile.Enabled { + return nil + } + upperPlatform := strings.ToUpper(platform) + if strings.TrimSpace(profile.IngressSecret) == "" { + return fmt.Errorf("AI_CS_PLATFORM_%s_INGRESS_SECRET must not be empty when platform ingress is enabled", upperPlatform) + } + if profile.CallbackTimeoutMS <= 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_TIMEOUT_MS must be positive", upperPlatform) + } + if profile.CallbackMaxRetries < 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_MAX_RETRIES must not be negative", upperPlatform) + } + return nil +} + func normalizeRuntimeEnv(value string) string { switch strings.TrimSpace(strings.ToLower(value)) { case "", "dev", "development": diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 56297bb..02a8660 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -208,3 +208,70 @@ func TestLoad_RejectsProdWhenWebhookSecretMissing(t *testing.T) { t.Fatalf("unexpected error: %v", err) } } + +func TestLoad_PlatformAdaptersDisabled_IgnoresPlatformSecrets(t *testing.T) { + t.Setenv("AI_CS_PLATFORM_ADAPTERS_ENABLED", "false") + t.Setenv("AI_CS_PLATFORM_SUB2API_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", "") + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error = %v", err) + } + if cfg.PlatformAdapters.Enabled { + t.Fatalf("platform adapters enabled = true, want false") + } + if !cfg.PlatformAdapters.Sub2API.Enabled { + t.Fatalf("sub2api enabled = false, want true") + } +} + +func TestLoad_RejectsEnabledSub2APIWithoutIngressSecret(t *testing.T) { + t.Setenv("AI_CS_PLATFORM_ADAPTERS_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", "") + + _, err := Load() + if err == nil { + t.Fatal("expected error when sub2api ingress secret is missing") + } + if !strings.Contains(err.Error(), "AI_CS_PLATFORM_SUB2API_INGRESS_SECRET") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestLoad_PlatformAdapterOverrides(t *testing.T) { + t.Setenv("AI_CS_PLATFORM_ADAPTERS_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", "sub2api-secret") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL", "https://callback.example.com/sub2api") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET", "cb-secret") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS", "4000") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES", "7") + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error = %v", err) + } + if !cfg.PlatformAdapters.Enabled { + t.Fatalf("platform adapters enabled = false, want true") + } + if !cfg.PlatformAdapters.Sub2API.Enabled { + t.Fatalf("sub2api enabled = false, want true") + } + if cfg.PlatformAdapters.Sub2API.IngressSecret != "sub2api-secret" { + t.Fatalf("sub2api ingress secret = %s, want sub2api-secret", cfg.PlatformAdapters.Sub2API.IngressSecret) + } + if cfg.PlatformAdapters.Sub2API.CallbackBaseURL != "https://callback.example.com/sub2api" { + t.Fatalf("sub2api callback base url = %s", cfg.PlatformAdapters.Sub2API.CallbackBaseURL) + } + if cfg.PlatformAdapters.Sub2API.CallbackSecret != "cb-secret" { + t.Fatalf("sub2api callback secret = %s", cfg.PlatformAdapters.Sub2API.CallbackSecret) + } + if cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS != 4000 { + t.Fatalf("sub2api callback timeout ms = %d, want 4000", cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS) + } + if cfg.PlatformAdapters.Sub2API.CallbackMaxRetries != 7 { + t.Fatalf("sub2api callback max retries = %d, want 7", cfg.PlatformAdapters.Sub2API.CallbackMaxRetries) + } +} diff --git a/internal/domain/platformevent/event.go b/internal/domain/platformevent/event.go new file mode 100644 index 0000000..446e09c --- /dev/null +++ b/internal/domain/platformevent/event.go @@ -0,0 +1,81 @@ +package platformevent + +import ( + "fmt" + "strings" + "time" +) + +type Status string + +const ( + StatusPending Status = "pending" + StatusRetrying Status = "retrying" + StatusDelivered Status = "delivered" + StatusDeadLetter Status = "dead_letter" +) + +const ( + TypeMessageReceived = "message.received" + TypeMessageRejected = "message.rejected" + TypeMessageDeduped = "message.deduplicated" + TypeMessageProcessing = "message.processing" + TypeIntentResolved = "intent.resolved" + TypeHandoffTriggered = "handoff.triggered" + TypeTicketCreated = "ticket.created" + TypeTicketAssigned = "ticket.assigned" + TypeTicketResolved = "ticket.resolved" + TypeTicketClosed = "ticket.closed" + TypeReplyGenerated = "reply.generated" + TypeCallbackDelivered = "callback.delivered" + TypeCallbackFailed = "callback.failed" +) + +type Event struct { + ID string `json:"event_id"` + Platform string `json:"platform"` + EventType string `json:"event_type"` + SessionID string `json:"session_id,omitempty"` + TicketID string `json:"ticket_id,omitempty"` + SourceMessageID string `json:"source_message_id,omitempty"` + CallbackTarget string `json:"callback_target"` + Payload map[string]any `json:"payload"` + Status Status `json:"status"` + AttemptCount int `json:"attempt_count"` + NextAttemptAt time.Time `json:"next_attempt_at"` + OccurredAt time.Time `json:"occurred_at"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + DeliveredAt *time.Time `json:"delivered_at,omitempty"` + LastError string `json:"last_error,omitempty"` +} + +func (e Event) Validate() error { + if strings.TrimSpace(e.ID) == "" { + return fmt.Errorf("event id is required") + } + if strings.TrimSpace(e.Platform) == "" { + return fmt.Errorf("platform is required") + } + if strings.TrimSpace(e.EventType) == "" { + return fmt.Errorf("event type is required") + } + if strings.TrimSpace(e.CallbackTarget) == "" { + return fmt.Errorf("callback target is required") + } + switch e.Status { + case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter: + default: + return fmt.Errorf("invalid status: %s", e.Status) + } + if e.AttemptCount < 0 { + return fmt.Errorf("attempt count must not be negative") + } + if e.NextAttemptAt.IsZero() { + return fmt.Errorf("next attempt at is required") + } + if e.OccurredAt.IsZero() { + return fmt.Errorf("occurred at is required") + } + return nil +} diff --git a/internal/domain/platformevent/event_test.go b/internal/domain/platformevent/event_test.go new file mode 100644 index 0000000..9f2fb0b --- /dev/null +++ b/internal/domain/platformevent/event_test.go @@ -0,0 +1,45 @@ +package platformevent + +import ( + "strings" + "testing" + "time" +) + +func TestEvent_Validate(t *testing.T) { + now := time.Now() + event := Event{ + ID: "evt-1", + Platform: "sub2api", + EventType: TypeReplyGenerated, + CallbackTarget: "default", + Status: StatusPending, + AttemptCount: 0, + NextAttemptAt: now, + OccurredAt: now, + } + + if err := event.Validate(); err != nil { + t.Fatalf("Validate() error = %v", err) + } +} + +func TestEvent_ValidateRejectsInvalidStatus(t *testing.T) { + event := Event{ + ID: "evt-1", + Platform: "sub2api", + EventType: TypeReplyGenerated, + CallbackTarget: "default", + Status: Status("invalid"), + NextAttemptAt: time.Now(), + OccurredAt: time.Now(), + } + + err := event.Validate() + if err == nil { + t.Fatal("expected error for invalid status") + } + if !strings.Contains(err.Error(), "invalid status") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/internal/http/handlers/platform_webhook_handler.go b/internal/http/handlers/platform_webhook_handler.go new file mode 100644 index 0000000..356efdc --- /dev/null +++ b/internal/http/handlers/platform_webhook_handler.go @@ -0,0 +1,122 @@ +package handlers + +import ( + "context" + "errors" + "io" + "net/http" + "strings" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/error/cserrors" + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/domain/platformevent" + "github.com/bridge/ai-customer-service/internal/platformadapter" + "github.com/bridge/ai-customer-service/internal/service/dialog" + "github.com/bridge/ai-customer-service/internal/service/platformevents" +) + +type PlatformDialogProcessor interface { + Process(ctx context.Context, msg *message.UnifiedMessage) (*dialog.Result, error) +} + +type PlatformEventWriter interface { + InsertPendingBatch(ctx context.Context, events []platformevent.Event) error +} + +type PlatformWebhookHandler struct { + dialog PlatformDialogProcessor + registry *platformadapter.Registry + eventWriter PlatformEventWriter + now func() time.Time +} + +func NewPlatformWebhookHandler(dialogProcessor PlatformDialogProcessor, registry *platformadapter.Registry, eventWriter PlatformEventWriter) *PlatformWebhookHandler { + return &PlatformWebhookHandler{ + dialog: dialogProcessor, + registry: registry, + eventWriter: eventWriter, + now: time.Now, + } +} + +func (h *PlatformWebhookHandler) Handle(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeJSON(w, http.StatusMethodNotAllowed, map[string]any{"error": map[string]any{"code": cserrors.CS_HTTP_405, "message": cserrors.ErrorMsg(cserrors.CS_HTTP_405)}}) + return + } + platform, channel, ok := parsePlatformWebhookPath(r.URL.Path) + if !ok { + writeJSON(w, http.StatusNotFound, map[string]any{"error": map[string]any{"code": "CS_PLATFORM_4040", "message": "platform webhook path not found"}}) + return + } + if platform == "" { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": map[string]any{"code": "CS_PLATFORM_4001", "message": "platform is required"}}) + return + } + if h.registry == nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": cserrors.CS_SYS_5001, "message": cserrors.ErrorMsg(cserrors.CS_SYS_5001)}}) + return + } + now := h.now() + adapter, ok := h.registry.Resolve(platform) + if !ok { + writeJSON(w, http.StatusNotFound, map[string]any{"error": map[string]any{"code": "CS_PLATFORM_4041", "message": "platform adapter not found"}}) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": map[string]any{"code": cserrors.CS_REQ_4004, "message": cserrors.ErrorMsg(cserrors.CS_REQ_4004)}}) + return + } + msg, meta, err := adapter.ParseInbound(r, body, platformadapter.IngressContext{ + Platform: platform, + PathChannel: channel, + ReceivedAt: now, + }) + if err != nil { + var reqErr *platformadapter.RequestError + if errors.As(err, &reqErr) { + writeJSON(w, reqErr.Status, map[string]any{"error": map[string]any{"code": reqErr.Code, "message": reqErr.Message}}) + return + } + writeJSON(w, http.StatusBadRequest, map[string]any{"error": map[string]any{"code": cserrors.CS_REQ_4001, "message": cserrors.ErrorMsg(cserrors.CS_REQ_4001)}}) + return + } + result, err := h.dialog.Process(r.Context(), msg) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": cserrors.CS_SYS_5001, "message": cserrors.ErrorMsg(cserrors.CS_SYS_5001)}}) + return + } + if h.eventWriter == nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": cserrors.CS_SYS_5001, "message": cserrors.ErrorMsg(cserrors.CS_SYS_5001)}}) + return + } + events, err := platformevents.BuildInboundEvents(msg, result, meta, now) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": cserrors.CS_SYS_5001, "message": cserrors.ErrorMsg(cserrors.CS_SYS_5001)}}) + return + } + if err := h.eventWriter.InsertPendingBatch(r.Context(), events); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": cserrors.CS_SYS_5001, "message": cserrors.ErrorMsg(cserrors.CS_SYS_5001)}}) + return + } + writeJSON(w, http.StatusOK, adapter.BuildIngressAck(result, meta)) +} + +func parsePlatformWebhookPath(path string) (platform string, channel string, ok bool) { + const prefix = "/api/v1/customer-service/platforms/" + if !strings.HasPrefix(path, prefix) { + return "", "", false + } + trimmed := strings.Trim(strings.TrimPrefix(path, prefix), "/") + parts := strings.Split(trimmed, "/") + if len(parts) < 2 || parts[1] != "webhook" { + return "", "", false + } + platform = strings.TrimSpace(parts[0]) + if len(parts) > 2 { + channel = strings.TrimSpace(strings.Join(parts[2:], "/")) + } + return platform, channel, true +} diff --git a/internal/http/handlers/platform_webhook_handler_test.go b/internal/http/handlers/platform_webhook_handler_test.go new file mode 100644 index 0000000..79e017f --- /dev/null +++ b/internal/http/handlers/platform_webhook_handler_test.go @@ -0,0 +1,112 @@ +package handlers + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + intentdomain "github.com/bridge/ai-customer-service/internal/domain/intent" + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/domain/platformevent" + "github.com/bridge/ai-customer-service/internal/platformadapter" + "github.com/bridge/ai-customer-service/internal/service/dialog" + "github.com/bridge/ai-customer-service/internal/service/handoff" +) + +type stubPlatformDialogProcessor struct { + result *dialog.Result + err error + msg *message.UnifiedMessage +} + +func (s *stubPlatformDialogProcessor) Process(_ context.Context, msg *message.UnifiedMessage) (*dialog.Result, error) { + s.msg = msg + if s.err != nil { + return nil, s.err + } + return s.result, nil +} + +type stubPlatformEventWriter struct { + events []platformevent.Event + err error +} + +func (s *stubPlatformEventWriter) InsertPendingBatch(_ context.Context, events []platformevent.Event) error { + s.events = append(s.events, events...) + return s.err +} + +func TestPlatformWebhookHandler_ShouldEnqueueMessageReceivedAndReplyGenerated(t *testing.T) { + registry := platformadapter.NewRegistry(platformadapter.NewSub2APIAdapter()) + processor := &stubPlatformDialogProcessor{result: &dialog.Result{SessionID: "sess-1", Reply: "好的", Intent: &intentdomain.Result{Intent: intentdomain.IntentRefund, Confidence: 0.9}}} + writer := &stubPlatformEventWriter{} + handler := NewPlatformWebhookHandler(processor, registry, writer) + + body := `{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"我要退款"}` + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/sub2api/webhook", strings.NewReader(body)) + rr := httptest.NewRecorder() + handler.Handle(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rr.Code, rr.Body.String()) + } + if processor.msg == nil || processor.msg.OpenID != "u1" { + t.Fatalf("processor msg = %+v, want mapped message", processor.msg) + } + if !strings.Contains(rr.Body.String(), `"accepted":true`) { + t.Fatalf("response body = %s, want accepted=true", rr.Body.String()) + } + if len(writer.events) != 4 { + t.Fatalf("events len = %d, want 4", len(writer.events)) + } + if writer.events[0].EventType != platformevent.TypeMessageReceived { + t.Fatalf("first event type = %s", writer.events[0].EventType) + } + if writer.events[len(writer.events)-1].EventType != platformevent.TypeReplyGenerated { + t.Fatalf("last event type = %s", writer.events[len(writer.events)-1].EventType) + } +} + +func TestPlatformWebhookHandler_ShouldEnqueueHandoffAndTicketCreatedWhenNeeded(t *testing.T) { + registry := platformadapter.NewRegistry(platformadapter.NewSub2APIAdapter()) + processor := &stubPlatformDialogProcessor{result: &dialog.Result{ + SessionID: "sess-1", + Reply: "已转人工", + Intent: &intentdomain.Result{Intent: intentdomain.IntentHandoff, Confidence: 0.88}, + Handoff: &handoff.Decision{ShouldHandoff: true, Priority: "P1", Reason: "complaint"}, + TicketID: "ticket-1", + }} + writer := &stubPlatformEventWriter{} + handler := NewPlatformWebhookHandler(processor, registry, writer) + + body := `{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"我要投诉"}` + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/sub2api/webhook", strings.NewReader(body)) + rr := httptest.NewRecorder() + handler.Handle(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rr.Code, rr.Body.String()) + } + if len(writer.events) != 6 { + t.Fatalf("events len = %d, want 6", len(writer.events)) + } + if writer.events[3].EventType != platformevent.TypeHandoffTriggered { + t.Fatalf("handoff event type = %s", writer.events[3].EventType) + } + if writer.events[4].EventType != platformevent.TypeTicketCreated { + t.Fatalf("ticket event type = %s", writer.events[4].EventType) + } +} + +func TestPlatformWebhookHandler_ShouldRejectUnknownPlatform(t *testing.T) { + handler := NewPlatformWebhookHandler(&stubPlatformDialogProcessor{}, platformadapter.NewRegistry(platformadapter.NewSub2APIAdapter()), &stubPlatformEventWriter{}) + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/unknown/webhook", strings.NewReader(`{}`)) + rr := httptest.NewRecorder() + handler.Handle(rr, req) + if rr.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404", rr.Code) + } +} diff --git a/internal/http/handlers/platform_webhook_security.go b/internal/http/handlers/platform_webhook_security.go new file mode 100644 index 0000000..e046e30 --- /dev/null +++ b/internal/http/handlers/platform_webhook_security.go @@ -0,0 +1,61 @@ +package handlers + +import ( + "net/http" + "strings" + "time" +) + +type PlatformWebhookSecurity struct { + TimestampHeader string + SignatureHeader string + MaxSkew time.Duration + Audit AuditRecorder + Sub2APISecret string + NewAPISecret string +} + +func (s PlatformWebhookSecurity) Wrap(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + next.ServeHTTP(w, r) + return + } + platform, _, ok := parsePlatformWebhookPath(r.URL.Path) + if !ok { + next.ServeHTTP(w, r) + return + } + security, enabled := s.securityForPlatform(platform) + if !enabled { + next.ServeHTTP(w, r) + return + } + security.Wrap(next).ServeHTTP(w, r) + }) +} + +func (s PlatformWebhookSecurity) securityForPlatform(platform string) (WebhookSecurity, bool) { + secret := strings.TrimSpace(s.secretForPlatform(platform)) + if secret == "" { + return WebhookSecurity{}, false + } + return WebhookSecurity{ + Secret: secret, + TimestampHeader: s.TimestampHeader, + SignatureHeader: s.SignatureHeader, + MaxSkew: s.MaxSkew, + Audit: s.Audit, + }, true +} + +func (s PlatformWebhookSecurity) secretForPlatform(platform string) string { + switch strings.ToLower(strings.TrimSpace(platform)) { + case "sub2api": + return s.Sub2APISecret + case "newapi": + return s.NewAPISecret + default: + return "" + } +} diff --git a/internal/http/handlers/platform_webhook_security_test.go b/internal/http/handlers/platform_webhook_security_test.go new file mode 100644 index 0000000..67d4822 --- /dev/null +++ b/internal/http/handlers/platform_webhook_security_test.go @@ -0,0 +1,86 @@ +package handlers + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestPlatformWebhookSecurity_ShouldAcceptSignedSub2APIRequest(t *testing.T) { + secured := PlatformWebhookSecurity{ + Sub2APISecret: "sub2api-secret", + TimestampHeader: "X-CS-Timestamp", + SignatureHeader: "X-CS-Signature", + MaxSkew: 5 * time.Minute, + } + handler := secured.Wrap(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + body := []byte(`{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"hello"}`) + timestampStr := formatUnix(time.Now().Unix()) + signature := signBody("sub2api-secret", timestampStr, body) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + req.Header.Set("X-CS-Timestamp", timestampStr) + req.Header.Set("X-CS-Signature", signature) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + if resp.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.Code) + } +} + +func TestPlatformWebhookSecurity_ShouldRejectInvalidSignatureForConfiguredPlatform(t *testing.T) { + auditRecorder := &stubAuditRecorder{} + secured := PlatformWebhookSecurity{ + Sub2APISecret: "sub2api-secret", + TimestampHeader: "X-CS-Timestamp", + SignatureHeader: "X-CS-Signature", + MaxSkew: 5 * time.Minute, + Audit: auditRecorder, + } + handler := secured.Wrap(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + + body := []byte(`{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"hello"}`) + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + req.Header.Set("X-CS-Timestamp", formatUnix(time.Now().Unix())) + req.Header.Set("X-CS-Signature", "wrong-signature") + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + if resp.Code != http.StatusForbidden { + t.Fatalf("status = %d, want 403", resp.Code) + } + if len(auditRecorder.events) != 1 { + t.Fatalf("audit count = %d, want 1", len(auditRecorder.events)) + } +} + +func TestPlatformWebhookSecurity_ShouldBypassUnknownPlatform(t *testing.T) { + hit := false + secured := PlatformWebhookSecurity{ + Sub2APISecret: "sub2api-secret", + MaxSkew: 5 * time.Minute, + } + handler := secured.Wrap(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + hit = true + w.WriteHeader(http.StatusNotFound) + })) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/unknown/webhook", bytes.NewBufferString(`{}`)) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + if !hit { + t.Fatal("expected next handler to handle unknown platform") + } + if resp.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404", resp.Code) + } +} diff --git a/internal/http/router.go b/internal/http/router.go index 5d96e9b..c798db8 100644 --- a/internal/http/router.go +++ b/internal/http/router.go @@ -11,14 +11,16 @@ import ( ) type RouterDeps struct { - Health *handlers.HealthHandler - Webhook *handlers.WebhookHandler - Tickets *handlers.TicketHandler - TicketStats *handlers.TicketStatsHandler - Sessions *handlers.SessionHandler - WebhookAuth handlers.WebhookSecurity - MaxBodyBytes int64 - RateLimiter *httpx.RateLimiter + Health *handlers.HealthHandler + Webhook *handlers.WebhookHandler + PlatformWebhook *handlers.PlatformWebhookHandler + PlatformWebhookAuth handlers.PlatformWebhookSecurity + Tickets *handlers.TicketHandler + TicketStats *handlers.TicketStatsHandler + Sessions *handlers.SessionHandler + WebhookAuth handlers.WebhookSecurity + MaxBodyBytes int64 + RateLimiter *httpx.RateLimiter } func NewRouter(deps RouterDeps) http.Handler { @@ -52,6 +54,15 @@ func NewRouter(deps RouterDeps) http.Handler { webhookChannel = deps.WebhookAuth.Wrap(webhookChannel) mux.Handle("/api/v1/customer-service/webhook/", webhookChannel) + if deps.PlatformWebhook != nil { + platformWebhook := httpx.WithBodyLimit(http.HandlerFunc(deps.PlatformWebhook.Handle), deps.MaxBodyBytes) + if deps.RateLimiter != nil { + platformWebhook = deps.RateLimiter.WithRateLimit(platformWebhook) + } + platformWebhook = deps.PlatformWebhookAuth.Wrap(platformWebhook) + mux.Handle("/api/v1/customer-service/platforms/", platformWebhook) + } + if deps.Tickets != nil { mux.HandleFunc("/api/v1/customer-service/tickets", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { diff --git a/internal/http/router_test.go b/internal/http/router_test.go index a0bd210..d665057 100644 --- a/internal/http/router_test.go +++ b/internal/http/router_test.go @@ -1,13 +1,20 @@ package httpserver import ( + "bytes" + "context" "net/http" "net/http/httptest" + "strconv" "testing" + "time" + "github.com/bridge/ai-customer-service/internal/domain/message" "github.com/bridge/ai-customer-service/internal/http/handlers" "github.com/bridge/ai-customer-service/internal/http/middleware" "github.com/bridge/ai-customer-service/internal/platform/health" + "github.com/bridge/ai-customer-service/internal/platformadapter" + "github.com/bridge/ai-customer-service/internal/service/dialog" ) func TestRouter_HealthEndpoint(t *testing.T) { @@ -258,3 +265,50 @@ func TestRouter_SessionHandoff_RejectsWhenAuthHeadersMissing(t *testing.T) { t.Fatalf("POST /sessions/s1/handoff without auth = %d, want 403", rr.Code) } } + +type stubPlatformRouterProcessor struct{} + +func (s *stubPlatformRouterProcessor) Process(_ context.Context, _ *message.UnifiedMessage) (*dialog.Result, error) { + return &dialog.Result{SessionID: "sess-router"}, nil +} + +func TestRouter_PlatformWebhookRoute_Registered(t *testing.T) { + probe := health.NewProbe() + probe.SetReady(true) + h := handlers.NewHealthHandler(probe) + platformHandler := handlers.NewPlatformWebhookHandler(&stubPlatformRouterProcessor{}, platformadapter.NewRegistry(platformadapter.NewSub2APIAdapter()), nil) + router := NewRouter(RouterDeps{Health: h, PlatformWebhook: platformHandler}) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/customer-service/platforms/sub2api/webhook", nil) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + if rr.Code == http.StatusNotFound { + t.Fatalf("platform webhook route returned 404; route not registered") + } +} + +func TestRouter_PlatformWebhookRoute_RejectsWhenSignatureMissing(t *testing.T) { + probe := health.NewProbe() + probe.SetReady(true) + h := handlers.NewHealthHandler(probe) + platformHandler := handlers.NewPlatformWebhookHandler(&stubPlatformRouterProcessor{}, platformadapter.NewRegistry(platformadapter.NewSub2APIAdapter()), nil) + router := NewRouter(RouterDeps{ + Health: h, + PlatformWebhook: platformHandler, + PlatformWebhookAuth: handlers.PlatformWebhookSecurity{ + Sub2APISecret: "sub2api-secret", + TimestampHeader: "X-CS-Timestamp", + SignatureHeader: "X-CS-Signature", + MaxSkew: 5 * time.Minute, + }, + }) + + body := []byte(`{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"hello"}`) + req := httptest.NewRequest(http.MethodPost, "/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + req.Header.Set("X-CS-Timestamp", strconv.FormatInt(time.Now().Unix(), 10)) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + if rr.Code != http.StatusForbidden { + t.Fatalf("POST /platforms/sub2api/webhook without signature = %d, want 403", rr.Code) + } +} diff --git a/internal/platformadapter/newapi_adapter.go b/internal/platformadapter/newapi_adapter.go new file mode 100644 index 0000000..10b1510 --- /dev/null +++ b/internal/platformadapter/newapi_adapter.go @@ -0,0 +1,33 @@ +package platformadapter + +import ( + "net/http" + + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/service/dialog" +) + +type NewAPIAdapter struct{} + +func NewNewAPIAdapter() *NewAPIAdapter { + return &NewAPIAdapter{} +} + +func (a *NewAPIAdapter) Platform() string { + return "newapi" +} + +func (a *NewAPIAdapter) ParseInbound(_ *http.Request, _ []byte, _ IngressContext) (*message.UnifiedMessage, *PlatformInboundMeta, error) { + return nil, nil, NewRequestError(http.StatusNotImplemented, "CS_PLATFORM_5010", "newapi profile is not implemented") +} + +func (a *NewAPIAdapter) BuildIngressAck(_ *dialog.Result, meta *PlatformInboundMeta) any { + resp := map[string]any{ + "accepted": false, + "platform": a.Platform(), + } + if meta != nil { + resp["event_id"] = meta.EventID + } + return resp +} diff --git a/internal/platformadapter/newapi_adapter_test.go b/internal/platformadapter/newapi_adapter_test.go new file mode 100644 index 0000000..c9064e6 --- /dev/null +++ b/internal/platformadapter/newapi_adapter_test.go @@ -0,0 +1,33 @@ +package platformadapter + +import ( + "net/http" + "testing" + "time" +) + +func TestNewAPIAdapter_ShouldBeRegisteredButDisabledByDefault(t *testing.T) { + registry := NewRegistry(NewNewAPIAdapter()) + adapter, ok := registry.Resolve("newapi") + if !ok { + t.Fatal("expected newapi adapter to resolve") + } + if adapter.Platform() != "newapi" { + t.Fatalf("adapter.Platform() = %s, want newapi", adapter.Platform()) + } + + _, _, err := adapter.ParseInbound(nil, nil, IngressContext{ + Platform: "newapi", + ReceivedAt: time.Now(), + }) + reqErr, ok := err.(*RequestError) + if !ok { + t.Fatalf("expected RequestError, got %T", err) + } + if reqErr.Status != http.StatusNotImplemented { + t.Fatalf("status = %d, want 501", reqErr.Status) + } + if reqErr.Code != "CS_PLATFORM_5010" { + t.Fatalf("code = %s, want CS_PLATFORM_5010", reqErr.Code) + } +} diff --git a/internal/platformadapter/registry.go b/internal/platformadapter/registry.go new file mode 100644 index 0000000..36c13df --- /dev/null +++ b/internal/platformadapter/registry.go @@ -0,0 +1,40 @@ +package platformadapter + +import "strings" + +type Registry struct { + adapters map[string]PlatformAdapter +} + +func NewRegistry(adapters ...PlatformAdapter) *Registry { + r := &Registry{adapters: make(map[string]PlatformAdapter)} + for _, adapter := range adapters { + if adapter == nil { + continue + } + r.Register(adapter) + } + return r +} + +func (r *Registry) Register(adapter PlatformAdapter) { + if r == nil || adapter == nil { + return + } + if r.adapters == nil { + r.adapters = make(map[string]PlatformAdapter) + } + key := strings.TrimSpace(strings.ToLower(adapter.Platform())) + if key == "" { + return + } + r.adapters[key] = adapter +} + +func (r *Registry) Resolve(platform string) (PlatformAdapter, bool) { + if r == nil { + return nil, false + } + adapter, ok := r.adapters[strings.TrimSpace(strings.ToLower(platform))] + return adapter, ok +} diff --git a/internal/platformadapter/registry_test.go b/internal/platformadapter/registry_test.go new file mode 100644 index 0000000..bf3af9f --- /dev/null +++ b/internal/platformadapter/registry_test.go @@ -0,0 +1,21 @@ +package platformadapter + +import "testing" + +func TestRegistry_ShouldResolveSub2APIAdapter(t *testing.T) { + registry := NewRegistry(NewSub2APIAdapter(), NewNewAPIAdapter()) + adapter, ok := registry.Resolve("sub2api") + if !ok { + t.Fatal("expected sub2api adapter to resolve") + } + if got := adapter.Platform(); got != "sub2api" { + t.Fatalf("adapter.Platform() = %q, want sub2api", got) + } +} + +func TestRegistry_ShouldRejectUnknownPlatform(t *testing.T) { + registry := NewRegistry(NewSub2APIAdapter()) + if _, ok := registry.Resolve("unknown"); ok { + t.Fatal("expected unknown platform to be rejected") + } +} diff --git a/internal/platformadapter/sub2api_adapter.go b/internal/platformadapter/sub2api_adapter.go new file mode 100644 index 0000000..119167a --- /dev/null +++ b/internal/platformadapter/sub2api_adapter.go @@ -0,0 +1,89 @@ +package platformadapter + +import ( + "encoding/json" + "net/http" + "strings" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/service/dialog" +) + +type Sub2APIAdapter struct{} + +func NewSub2APIAdapter() *Sub2APIAdapter { + return &Sub2APIAdapter{} +} + +func (a *Sub2APIAdapter) Platform() string { + return "sub2api" +} + +func (a *Sub2APIAdapter) ParseInbound(_ *http.Request, body []byte, ctx IngressContext) (*message.UnifiedMessage, *PlatformInboundMeta, error) { + var payload Sub2APIInboundPayload + decoder := json.NewDecoder(strings.NewReader(string(body))) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&payload); err != nil { + return nil, nil, NewRequestError(http.StatusBadRequest, "CS_REQ_4001", "invalid request body") + } + + payload.Channel = strings.TrimSpace(payload.Channel) + payload.OpenID = strings.TrimSpace(payload.OpenID) + payload.UserID = strings.TrimSpace(payload.UserID) + payload.Content = strings.TrimSpace(payload.Content) + payload.ContentType = strings.TrimSpace(payload.ContentType) + payload.ReplyTo = strings.TrimSpace(payload.ReplyTo) + if ctx.PathChannel != "" { + payload.Channel = strings.TrimSpace(ctx.PathChannel) + } + if payload.Channel == "" || payload.OpenID == "" || payload.Content == "" { + return nil, nil, NewRequestError(http.StatusBadRequest, "CS_REQ_4002", "channel, open_id and content are required") + } + if payload.Timestamp.IsZero() { + payload.Timestamp = ctx.ReceivedAt + } + + msg := &message.UnifiedMessage{ + MessageID: payload.MessageID, + Channel: payload.Channel, + OpenID: payload.OpenID, + UserID: payload.UserID, + Content: payload.Content, + ContentType: payload.ContentType, + Timestamp: payload.Timestamp, + ReplyTo: payload.ReplyTo, + } + meta := &PlatformInboundMeta{ + EventID: buildEventID("sub2api", payload.MessageID, payload.OpenID, ctx.ReceivedAt), + Platform: a.Platform(), + Channel: payload.Channel, + SourceMessageID: payload.MessageID, + SourceUserID: payload.OpenID, + } + return msg, meta, nil +} + +func (a *Sub2APIAdapter) BuildIngressAck(result *dialog.Result, meta *PlatformInboundMeta) any { + resp := map[string]any{ + "accepted": true, + "platform": a.Platform(), + } + if meta != nil { + resp["event_id"] = meta.EventID + } + if result != nil { + resp["session_id"] = result.SessionID + if strings.TrimSpace(result.TicketID) != "" { + resp["ticket_id"] = result.TicketID + } + } + return resp +} + +func buildEventID(platform, messageID, openID string, now time.Time) string { + if strings.TrimSpace(messageID) != "" { + return strings.ToLower(strings.TrimSpace(platform)) + ":" + strings.TrimSpace(messageID) + } + return strings.ToLower(strings.TrimSpace(platform)) + ":" + strings.TrimSpace(openID) + ":" + now.UTC().Format(time.RFC3339Nano) +} diff --git a/internal/platformadapter/sub2api_adapter_test.go b/internal/platformadapter/sub2api_adapter_test.go new file mode 100644 index 0000000..751d4a7 --- /dev/null +++ b/internal/platformadapter/sub2api_adapter_test.go @@ -0,0 +1,64 @@ +package platformadapter + +import ( + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestSub2APIAdapter_ShouldMapMinimalPayload(t *testing.T) { + adapter := NewSub2APIAdapter() + body := `{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"我要退款"}` + req := httptest.NewRequest("POST", "/api/v1/customer-service/platforms/sub2api/webhook", strings.NewReader(body)) + msg, meta, err := adapter.ParseInbound(req, []byte(body), IngressContext{Platform: "sub2api", ReceivedAt: time.Unix(100, 0)}) + if err != nil { + t.Fatalf("ParseInbound() error = %v", err) + } + if msg.Channel != "sub2api" || msg.OpenID != "u1" || msg.Content != "我要退款" { + t.Fatalf("unexpected unified message: %+v", msg) + } + if meta == nil || meta.SourceMessageID != "m1" || meta.Platform != "sub2api" { + t.Fatalf("unexpected meta: %+v", meta) + } +} + +func TestSub2APIAdapter_ShouldRejectUnknownEnvelopeFields(t *testing.T) { + adapter := NewSub2APIAdapter() + body := `{"message_id":"m1","channel":"sub2api","open_id":"u1","content":"hi","unknown":1}` + req := httptest.NewRequest("POST", "/api/v1/customer-service/platforms/sub2api/webhook", strings.NewReader(body)) + _, _, err := adapter.ParseInbound(req, []byte(body), IngressContext{Platform: "sub2api", ReceivedAt: time.Now()}) + if err == nil { + t.Fatal("expected error for unknown fields") + } +} + +func TestSub2APIAdapter_ShouldUseChannelOverrideWhenPresent(t *testing.T) { + adapter := NewSub2APIAdapter() + body := `{"message_id":"m1","channel":"body-channel","open_id":"u1","content":"hi"}` + req := httptest.NewRequest("POST", "/api/v1/customer-service/platforms/sub2api/webhook/widget", strings.NewReader(body)) + msg, _, err := adapter.ParseInbound(req, []byte(body), IngressContext{Platform: "sub2api", PathChannel: "widget", ReceivedAt: time.Now()}) + if err != nil { + t.Fatalf("ParseInbound() error = %v", err) + } + if msg.Channel != "widget" { + t.Fatalf("msg.Channel = %q, want widget", msg.Channel) + } +} + +func TestSub2APIAdapter_ShouldRequireOpenIDAndContent(t *testing.T) { + adapter := NewSub2APIAdapter() + body := `{"message_id":"m1","channel":"sub2api","open_id":"","content":""}` + req := httptest.NewRequest("POST", "/api/v1/customer-service/platforms/sub2api/webhook", strings.NewReader(body)) + _, _, err := adapter.ParseInbound(req, []byte(body), IngressContext{Platform: "sub2api", ReceivedAt: time.Now()}) + if err == nil { + t.Fatal("expected validation error") + } + reqErr, ok := err.(*RequestError) + if !ok { + t.Fatalf("error type = %T, want *RequestError", err) + } + if reqErr.Status != 400 { + t.Fatalf("reqErr.Status = %d, want 400", reqErr.Status) + } +} diff --git a/internal/platformadapter/sub2api_types.go b/internal/platformadapter/sub2api_types.go new file mode 100644 index 0000000..2645432 --- /dev/null +++ b/internal/platformadapter/sub2api_types.go @@ -0,0 +1,14 @@ +package platformadapter + +import "time" + +type Sub2APIInboundPayload struct { + MessageID string `json:"message_id"` + Channel string `json:"channel"` + OpenID string `json:"open_id"` + UserID string `json:"user_id,omitempty"` + Content string `json:"content"` + ContentType string `json:"content_type,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` + ReplyTo string `json:"reply_to,omitempty"` +} diff --git a/internal/platformadapter/types.go b/internal/platformadapter/types.go new file mode 100644 index 0000000..bedafad --- /dev/null +++ b/internal/platformadapter/types.go @@ -0,0 +1,49 @@ +package platformadapter + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/service/dialog" +) + +type IngressContext struct { + Platform string + PathChannel string + ReceivedAt time.Time +} + +type PlatformInboundMeta struct { + EventID string + Platform string + Channel string + SourceMessageID string + SourceUserID string + CallbackTarget string +} + +type PlatformAdapter interface { + Platform() string + ParseInbound(r *http.Request, body []byte, ctx IngressContext) (*message.UnifiedMessage, *PlatformInboundMeta, error) + BuildIngressAck(result *dialog.Result, meta *PlatformInboundMeta) any +} + +type RequestError struct { + Status int + Code string + Message string +} + +func (e *RequestError) Error() string { + if e == nil { + return "" + } + return fmt.Sprintf("%s %s", strings.TrimSpace(e.Code), strings.TrimSpace(e.Message)) +} + +func NewRequestError(status int, code, message string) *RequestError { + return &RequestError{Status: status, Code: code, Message: message} +} diff --git a/internal/service/platformdelivery/signer.go b/internal/service/platformdelivery/signer.go new file mode 100644 index 0000000..fde3bc0 --- /dev/null +++ b/internal/service/platformdelivery/signer.go @@ -0,0 +1,59 @@ +package platformdelivery + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "strconv" + "strings" + "time" +) + +const ( + DefaultTimestampHeader = "X-CS-Timestamp" + DefaultSignatureHeader = "X-CS-Signature" +) + +type Signer struct { + Secret string + TimestampHeader string + SignatureHeader string +} + +func (s Signer) Headers(body []byte, now time.Time) (http.Header, error) { + if strings.TrimSpace(s.Secret) == "" { + return nil, fmt.Errorf("secret is required") + } + if now.IsZero() { + now = time.Now() + } + timestamp := strconv.FormatInt(now.Unix(), 10) + headers := make(http.Header) + headers.Set(s.timestampHeader(), timestamp) + headers.Set(s.signatureHeader(), computeSignature(s.Secret, timestamp, body)) + return headers, nil +} + +func (s Signer) timestampHeader() string { + if strings.TrimSpace(s.TimestampHeader) == "" { + return DefaultTimestampHeader + } + return s.TimestampHeader +} + +func (s Signer) signatureHeader() string { + if strings.TrimSpace(s.SignatureHeader) == "" { + return DefaultSignatureHeader + } + return s.SignatureHeader +} + +func computeSignature(secret, timestamp string, body []byte) string { + mac := hmac.New(sha256.New, []byte(secret)) + _, _ = mac.Write([]byte(timestamp)) + _, _ = mac.Write([]byte(".")) + _, _ = mac.Write(body) + return hex.EncodeToString(mac.Sum(nil)) +} diff --git a/internal/service/platformdelivery/signer_test.go b/internal/service/platformdelivery/signer_test.go new file mode 100644 index 0000000..6b4e352 --- /dev/null +++ b/internal/service/platformdelivery/signer_test.go @@ -0,0 +1,28 @@ +package platformdelivery + +import ( + "testing" + "time" +) + +func TestSigner_ShouldProduceStableTimestampAndSignatureHeaders(t *testing.T) { + signer := Signer{ + Secret: "callback-secret", + TimestampHeader: DefaultTimestampHeader, + SignatureHeader: DefaultSignatureHeader, + } + body := []byte(`{"event_id":"evt-1"}`) + now := time.Unix(1_777_777_777, 0).UTC() + + headers, err := signer.Headers(body, now) + if err != nil { + t.Fatalf("Headers() error = %v", err) + } + if headers.Get(DefaultTimestampHeader) != "1777777777" { + t.Fatalf("timestamp header = %s, want 1777777777", headers.Get(DefaultTimestampHeader)) + } + expectedSignature := computeSignature("callback-secret", "1777777777", body) + if headers.Get(DefaultSignatureHeader) != expectedSignature { + t.Fatalf("signature header = %s, want %s", headers.Get(DefaultSignatureHeader), expectedSignature) + } +} diff --git a/internal/service/platformdelivery/worker.go b/internal/service/platformdelivery/worker.go new file mode 100644 index 0000000..e235064 --- /dev/null +++ b/internal/service/platformdelivery/worker.go @@ -0,0 +1,177 @@ +package platformdelivery + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/platformevent" +) + +type EventStore interface { + ListDue(ctx context.Context, platform string, dueBefore time.Time, limit int) ([]platformevent.Event, error) + MarkDelivered(ctx context.Context, eventID string, deliveredAt time.Time) error + RecordDeliveryAttempt(ctx context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error + MarkRetry(ctx context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error + MarkDeadLetter(ctx context.Context, eventID string, attemptCount int, finalError string) error +} + +type Worker struct { + Platform string + CallbackURL string + Store EventStore + Client *http.Client + Signer Signer + MaxRetries int + BatchSize int + PollInterval time.Duration + RetrySchedule []time.Duration + Now func() time.Time + Logger *slog.Logger +} + +func NewWorker(platform, callbackURL string, store EventStore, client *http.Client, signer Signer, maxRetries int) *Worker { + if client == nil { + client = &http.Client{Timeout: 3 * time.Second} + } + if maxRetries <= 0 { + maxRetries = 5 + } + return &Worker{ + Platform: strings.TrimSpace(platform), + CallbackURL: strings.TrimSpace(callbackURL), + Store: store, + Client: client, + Signer: signer, + MaxRetries: maxRetries, + BatchSize: 20, + PollInterval: 5 * time.Second, + RetrySchedule: []time.Duration{10 * time.Second, 30 * time.Second, 60 * time.Second, 5 * time.Minute, 15 * time.Minute}, + Now: time.Now, + } +} + +func (w *Worker) Start(ctx context.Context) { + if ctx == nil { + return + } + ticker := time.NewTicker(w.pollInterval()) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + default: + } + if err := w.RunOnce(ctx); err != nil && w.Logger != nil { + w.Logger.Error("platform callback delivery run failed", "platform", w.Platform, "error", err.Error()) + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} + +func (w *Worker) RunOnce(ctx context.Context) error { + if w.Store == nil { + return fmt.Errorf("event store is required") + } + if w.Platform == "" { + return fmt.Errorf("platform is required") + } + if w.CallbackURL == "" { + return fmt.Errorf("callback url is required") + } + now := w.now() + events, err := w.Store.ListDue(ctx, w.Platform, now, w.batchSize()) + if err != nil { + return err + } + for _, event := range events { + if err := w.deliver(ctx, event, now); err != nil && w.Logger != nil { + w.Logger.Warn("platform callback event delivery failed", "platform", w.Platform, "event_id", event.ID, "error", err.Error()) + } + } + return nil +} + +func (w *Worker) deliver(ctx context.Context, event platformevent.Event, now time.Time) error { + body, err := json.Marshal(event) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, w.CallbackURL, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + headers, err := w.Signer.Headers(body, now) + if err != nil { + return err + } + for key, values := range headers { + for _, value := range values { + req.Header.Add(key, value) + } + } + resp, err := w.Client.Do(req) + if err != nil { + _ = w.Store.RecordDeliveryAttempt(ctx, event.ID, event.AttemptCount+1, 0, "", err.Error()) + return w.retryOrDeadLetter(ctx, event, fmt.Sprintf("callback request failed: %v", err), now) + } + defer resp.Body.Close() + responseBody, _ := io.ReadAll(resp.Body) + _ = w.Store.RecordDeliveryAttempt(ctx, event.ID, event.AttemptCount+1, resp.StatusCode, string(responseBody), "") + if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + return w.Store.MarkDelivered(ctx, event.ID, now) + } + return w.retryOrDeadLetter(ctx, event, fmt.Sprintf("callback returned status %d", resp.StatusCode), now) +} + +func (w *Worker) retryOrDeadLetter(ctx context.Context, event platformevent.Event, lastError string, now time.Time) error { + attemptCount := event.AttemptCount + 1 + if attemptCount >= w.MaxRetries { + return w.Store.MarkDeadLetter(ctx, event.ID, attemptCount, lastError) + } + return w.Store.MarkRetry(ctx, event.ID, attemptCount, now.Add(w.backoffForAttempt(attemptCount)), lastError) +} + +func (w *Worker) backoffForAttempt(attempt int) time.Duration { + if attempt <= 0 || len(w.RetrySchedule) == 0 { + return 10 * time.Second + } + index := attempt - 1 + if index >= len(w.RetrySchedule) { + return w.RetrySchedule[len(w.RetrySchedule)-1] + } + return w.RetrySchedule[index] +} + +func (w *Worker) batchSize() int { + if w.BatchSize <= 0 { + return 20 + } + return w.BatchSize +} + +func (w *Worker) pollInterval() time.Duration { + if w.PollInterval <= 0 { + return 5 * time.Second + } + return w.PollInterval +} + +func (w *Worker) now() time.Time { + if w.Now == nil { + return time.Now() + } + return w.Now() +} diff --git a/internal/service/platformdelivery/worker_test.go b/internal/service/platformdelivery/worker_test.go new file mode 100644 index 0000000..bb82bdb --- /dev/null +++ b/internal/service/platformdelivery/worker_test.go @@ -0,0 +1,218 @@ +package platformdelivery + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/platformevent" +) + +type stubEventStore struct { + events []platformevent.Event + deliveredIDs []string + retriedIDs []string + deadLetterIDs []string + recordedIDs []string + recordedStatus []int + lastRetryAt time.Time + lastRetryError string + lastAttempt int +} + +func (s *stubEventStore) ListDue(_ context.Context, platform string, _ time.Time, _ int) ([]platformevent.Event, error) { + result := make([]platformevent.Event, 0, len(s.events)) + for _, event := range s.events { + if event.Platform == platform { + result = append(result, event) + } + } + return result, nil +} + +func (s *stubEventStore) MarkDelivered(_ context.Context, eventID string, _ time.Time) error { + s.deliveredIDs = append(s.deliveredIDs, eventID) + return nil +} + +func (s *stubEventStore) RecordDeliveryAttempt(_ context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error { + s.recordedIDs = append(s.recordedIDs, eventID) + s.recordedStatus = append(s.recordedStatus, responseStatus) + s.lastAttempt = attemptNo + if errorMessage != "" { + s.lastRetryError = errorMessage + } + _ = responseBody + return nil +} + +func (s *stubEventStore) MarkRetry(_ context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error { + s.retriedIDs = append(s.retriedIDs, eventID) + s.lastAttempt = attemptCount + s.lastRetryAt = nextAttemptAt + s.lastRetryError = lastError + return nil +} + +func (s *stubEventStore) MarkDeadLetter(_ context.Context, eventID string, attemptCount int, finalError string) error { + s.deadLetterIDs = append(s.deadLetterIDs, eventID) + s.lastAttempt = attemptCount + s.lastRetryError = finalError + return nil +} + +func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + store := &stubEventStore{ + events: []platformevent.Event{{ + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + }}, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get(DefaultTimestampHeader) == "" { + t.Fatal("timestamp header is missing") + } + if r.Header.Get(DefaultSignatureHeader) == "" { + t.Fatal("signature header is missing") + } + var event platformevent.Event + if err := json.NewDecoder(r.Body).Decode(&event); err != nil { + t.Fatalf("decode request body failed: %v", err) + } + if event.ID != "evt-1" { + t.Fatalf("event id = %s, want evt-1", event.ID) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + worker := NewWorker("sub2api", server.URL, store, server.Client(), Signer{Secret: "callback-secret"}, 5) + worker.Now = func() time.Time { return now } + + if err := worker.RunOnce(context.Background()); err != nil { + t.Fatalf("RunOnce() error = %v", err) + } + if len(store.deliveredIDs) != 1 || store.deliveredIDs[0] != "evt-1" { + t.Fatalf("delivered ids = %v, want [evt-1]", store.deliveredIDs) + } +} + +func TestWorker_ShouldRetryWhenCallbackReturns5xx(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + store := &stubEventStore{ + events: []platformevent.Event{{ + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + }}, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer server.Close() + + worker := NewWorker("sub2api", server.URL, store, server.Client(), Signer{Secret: "callback-secret"}, 5) + worker.Now = func() time.Time { return now } + worker.RetrySchedule = []time.Duration{15 * time.Second} + + if err := worker.RunOnce(context.Background()); err != nil { + t.Fatalf("RunOnce() error = %v", err) + } + if len(store.retriedIDs) != 1 || store.retriedIDs[0] != "evt-1" { + t.Fatalf("retried ids = %v, want [evt-1]", store.retriedIDs) + } + if store.lastAttempt != 1 { + t.Fatalf("attempt count = %d, want 1", store.lastAttempt) + } + if !store.lastRetryAt.Equal(now.Add(15 * time.Second)) { + t.Fatalf("retry at = %s, want %s", store.lastRetryAt, now.Add(15*time.Second)) + } +} + +func TestWorker_ShouldMoveEventToDeadLetterAfterMaxRetries(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + store := &stubEventStore{ + events: []platformevent.Event{{ + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusRetrying, + AttemptCount: 1, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + }}, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer server.Close() + + worker := NewWorker("sub2api", server.URL, store, server.Client(), Signer{Secret: "callback-secret"}, 2) + worker.Now = func() time.Time { return now } + + if err := worker.RunOnce(context.Background()); err != nil { + t.Fatalf("RunOnce() error = %v", err) + } + if len(store.deadLetterIDs) != 1 || store.deadLetterIDs[0] != "evt-1" { + t.Fatalf("dead letter ids = %v, want [evt-1]", store.deadLetterIDs) + } +} + +func TestWorker_ShouldPersistDeliveryAttemptAudit(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + store := &stubEventStore{ + events: []platformevent.Event{{ + ID: "evt-1", + Platform: "sub2api", + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusPending, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + }}, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte(`{"error":"upstream"}`)) + })) + defer server.Close() + + worker := NewWorker("sub2api", server.URL, store, server.Client(), Signer{Secret: "callback-secret"}, 5) + worker.Now = func() time.Time { return now } + + if err := worker.RunOnce(context.Background()); err != nil { + t.Fatalf("RunOnce() error = %v", err) + } + if len(store.recordedIDs) != 1 || store.recordedIDs[0] != "evt-1" { + t.Fatalf("recorded ids = %v, want [evt-1]", store.recordedIDs) + } + if len(store.recordedStatus) != 1 || store.recordedStatus[0] != http.StatusBadGateway { + t.Fatalf("recorded status = %v, want [%d]", store.recordedStatus, http.StatusBadGateway) + } +} diff --git a/internal/service/platformevents/builder.go b/internal/service/platformevents/builder.go new file mode 100644 index 0000000..1fc269e --- /dev/null +++ b/internal/service/platformevents/builder.go @@ -0,0 +1,92 @@ +package platformevents + +import ( + "fmt" + "time" + + "github.com/google/uuid" + + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/domain/platformevent" + "github.com/bridge/ai-customer-service/internal/platformadapter" + "github.com/bridge/ai-customer-service/internal/service/dialog" +) + +const defaultCallbackTarget = "default" + +func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta *platformadapter.PlatformInboundMeta, now time.Time) ([]platformevent.Event, error) { + if msg == nil { + return nil, fmt.Errorf("message is nil") + } + if result == nil { + return nil, fmt.Errorf("result is nil") + } + if meta == nil { + return nil, fmt.Errorf("platform inbound meta is nil") + } + if now.IsZero() { + now = time.Now() + } + + callbackTarget := meta.CallbackTarget + if callbackTarget == "" { + callbackTarget = defaultCallbackTarget + } + eventIndex := 0 + baseEvent := func(eventType string, payload map[string]any) platformevent.Event { + eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond) + eventIndex++ + return platformevent.Event{ + ID: uuid.New().String(), + Platform: meta.Platform, + EventType: eventType, + SessionID: result.SessionID, + TicketID: result.TicketID, + SourceMessageID: meta.SourceMessageID, + CallbackTarget: callbackTarget, + Payload: payload, + Status: platformevent.StatusPending, + AttemptCount: 0, + NextAttemptAt: eventTime, + OccurredAt: eventTime, + CreatedAt: eventTime, + UpdatedAt: eventTime, + } + } + + events := []platformevent.Event{ + baseEvent(platformevent.TypeMessageReceived, map[string]any{ + "channel": meta.Channel, + "open_id": msg.OpenID, + "user_id": msg.UserID, + "content": msg.Content, + "content_type": msg.ContentType, + "reply_to": msg.ReplyTo, + }), + baseEvent(platformevent.TypeMessageProcessing, map[string]any{ + "session_id": result.SessionID, + }), + } + + if result.Intent != nil { + events = append(events, baseEvent(platformevent.TypeIntentResolved, map[string]any{ + "intent": result.Intent.Intent, + "confidence": result.Intent.Confidence, + })) + } + if result.Handoff != nil && result.Handoff.ShouldHandoff { + events = append(events, baseEvent(platformevent.TypeHandoffTriggered, map[string]any{ + "priority": result.Handoff.Priority, + "reason": result.Handoff.Reason, + })) + } + if result.TicketID != "" { + events = append(events, baseEvent(platformevent.TypeTicketCreated, map[string]any{ + "ticket_id": result.TicketID, + })) + } + events = append(events, baseEvent(platformevent.TypeReplyGenerated, map[string]any{ + "reply": result.Reply, + })) + return events, nil +} diff --git a/internal/service/platformevents/builder_test.go b/internal/service/platformevents/builder_test.go new file mode 100644 index 0000000..595c78a --- /dev/null +++ b/internal/service/platformevents/builder_test.go @@ -0,0 +1,78 @@ +package platformevents + +import ( + "testing" + "time" + + intentdomain "github.com/bridge/ai-customer-service/internal/domain/intent" + "github.com/bridge/ai-customer-service/internal/domain/message" + "github.com/bridge/ai-customer-service/internal/platformadapter" + "github.com/bridge/ai-customer-service/internal/service/dialog" + "github.com/bridge/ai-customer-service/internal/service/handoff" +) + +func TestBuildInboundEvents_ShouldBuildReplyFlowEvents(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + events, err := BuildInboundEvents( + &message.UnifiedMessage{OpenID: "u1", Content: "我要退款", ContentType: "text"}, + &dialog.Result{ + SessionID: "sess-1", + Reply: "好的", + Intent: &intentdomain.Result{Intent: intentdomain.IntentRefund, Confidence: 0.92}, + }, + &platformadapter.PlatformInboundMeta{ + Platform: "sub2api", + Channel: "sub2api", + SourceMessageID: "m1", + CallbackTarget: "default", + }, + now, + ) + if err != nil { + t.Fatalf("BuildInboundEvents() error = %v", err) + } + if len(events) != 4 { + t.Fatalf("events len = %d, want 4", len(events)) + } + if events[0].EventType != "message.received" { + t.Fatalf("first event type = %s", events[0].EventType) + } + if events[len(events)-1].EventType != "reply.generated" { + t.Fatalf("last event type = %s", events[len(events)-1].EventType) + } +} + +func TestBuildInboundEvents_ShouldIncludeHandoffAndTicketCreated(t *testing.T) { + now := time.Now().UTC().Truncate(time.Second) + events, err := BuildInboundEvents( + &message.UnifiedMessage{OpenID: "u1", Content: "我要投诉"}, + &dialog.Result{ + SessionID: "sess-1", + Reply: "已转人工", + Intent: &intentdomain.Result{Intent: intentdomain.IntentHandoff, Confidence: 0.88}, + Handoff: &handoff.Decision{ShouldHandoff: true, Priority: "P1", Reason: "complaint"}, + TicketID: "ticket-1", + }, + &platformadapter.PlatformInboundMeta{ + Platform: "sub2api", + Channel: "sub2api", + SourceMessageID: "m1", + }, + now, + ) + if err != nil { + t.Fatalf("BuildInboundEvents() error = %v", err) + } + if len(events) != 6 { + t.Fatalf("events len = %d, want 6", len(events)) + } + if events[3].EventType != "handoff.triggered" { + t.Fatalf("handoff event type = %s", events[3].EventType) + } + if events[4].EventType != "ticket.created" { + t.Fatalf("ticket event type = %s", events[4].EventType) + } + if events[0].CallbackTarget != "default" { + t.Fatalf("callback target = %s, want default", events[0].CallbackTarget) + } +} diff --git a/internal/store/postgres/migrate.go b/internal/store/postgres/migrate.go index bc1af58..9035207 100644 --- a/internal/store/postgres/migrate.go +++ b/internal/store/postgres/migrate.go @@ -52,7 +52,7 @@ func RunMigrations(db *sql.DB, dir string) error { _ = tx.Rollback() return fmt.Errorf("apply migration %s: %w", name, err) } - if _, err := tx.Exec(`INSERT INTO cs_schema_migrations(version) VALUES ($1)`, version); err != nil { + if _, err := tx.Exec(`INSERT INTO cs_schema_migrations(version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, version); err != nil { _ = tx.Rollback() return err } diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go new file mode 100644 index 0000000..8081b0c --- /dev/null +++ b/internal/store/postgres/platform_event_store.go @@ -0,0 +1,196 @@ +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/platformevent" +) + +type PlatformEventStore struct { + db *sql.DB +} + +func NewPlatformEventStore(db *sql.DB) *PlatformEventStore { + return &PlatformEventStore{db: db} +} + +func (s *PlatformEventStore) InsertPending(ctx context.Context, event *platformevent.Event) error { + if event == nil { + return fmt.Errorf("event is nil") + } + return s.InsertPendingBatch(ctx, []platformevent.Event{*event}) +} + +func (s *PlatformEventStore) InsertPendingBatch(ctx context.Context, events []platformevent.Event) error { + if s.db == nil { + return fmt.Errorf("db is nil") + } + if len(events) == 0 { + return nil + } + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + for _, event := range events { + if err := event.Validate(); err != nil { + _ = tx.Rollback() + return err + } + payload, err := json.Marshal(event.Payload) + if err != nil { + _ = tx.Rollback() + return err + } + if _, err := tx.ExecContext(ctx, ` + INSERT INTO cs_platform_event_outbox( + id, platform, event_type, session_id, ticket_id, source_message_id, callback_target, + payload, status, attempt_count, next_attempt_at, occurred_at, delivered_at, last_error, created_at, updated_at + ) VALUES ( + $1, $2, $3, NULLIF($4,'')::uuid, NULLIF($5,'')::uuid, $6, $7, + $8::jsonb, $9, $10, $11, $12, $13, NULLIF($14,''), $15, $16 + ) + `, event.ID, event.Platform, event.EventType, event.SessionID, event.TicketID, event.SourceMessageID, event.CallbackTarget, + string(payload), string(event.Status), event.AttemptCount, event.NextAttemptAt, event.OccurredAt, event.DeliveredAt, event.LastError, event.CreatedAt, event.UpdatedAt); err != nil { + _ = tx.Rollback() + return err + } + } + return tx.Commit() +} + +func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBefore time.Time, limit int) ([]platformevent.Event, error) { + if s.db == nil { + return nil, fmt.Errorf("db is nil") + } + if limit <= 0 { + return nil, fmt.Errorf("limit must be positive") + } + platform = strings.TrimSpace(platform) + if platform == "" { + return nil, fmt.Errorf("platform is required") + } + rows, err := s.db.QueryContext(ctx, ` + SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), + callback_target, payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, + delivered_at, COALESCE(last_error, '') + FROM cs_platform_event_outbox + WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2 + ORDER BY next_attempt_at ASC, created_at ASC + LIMIT $3 + `, platform, dueBefore, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + events := make([]platformevent.Event, 0, limit) + for rows.Next() { + var ( + event platformevent.Event + payloadJSON []byte + status string + ) + if err := rows.Scan( + &event.ID, + &event.Platform, + &event.EventType, + &event.SessionID, + &event.TicketID, + &event.SourceMessageID, + &event.CallbackTarget, + &payloadJSON, + &status, + &event.AttemptCount, + &event.NextAttemptAt, + &event.OccurredAt, + &event.CreatedAt, + &event.UpdatedAt, + &event.DeliveredAt, + &event.LastError, + ); err != nil { + return nil, err + } + event.Status = platformevent.Status(status) + if len(payloadJSON) > 0 { + if err := json.Unmarshal(payloadJSON, &event.Payload); err != nil { + return nil, err + } + } + events = append(events, event) + } + if err := rows.Err(); err != nil { + return nil, err + } + return events, nil +} + +func (s *PlatformEventStore) MarkDelivered(ctx context.Context, eventID string, deliveredAt time.Time) error { + if s.db == nil { + return fmt.Errorf("db is nil") + } + _, err := s.db.ExecContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'delivered', delivered_at = $2, updated_at = $2 + WHERE id = $1 + `, eventID, deliveredAt) + return err +} + +func (s *PlatformEventStore) RecordDeliveryAttempt(ctx context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error { + if s.db == nil { + return fmt.Errorf("db is nil") + } + _, err := s.db.ExecContext(ctx, ` + INSERT INTO cs_platform_event_delivery_attempts(event_id, attempt_no, response_status, response_body, error_message) + VALUES ($1, $2, NULLIF($3, 0), NULLIF($4, ''), NULLIF($5, '')) + `, eventID, attemptNo, responseStatus, responseBody, errorMessage) + return err +} + +func (s *PlatformEventStore) MarkRetry(ctx context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error { + if s.db == nil { + return fmt.Errorf("db is nil") + } + _, err := s.db.ExecContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'retrying', attempt_count = $2, next_attempt_at = $3, last_error = NULLIF($4,''), updated_at = NOW() + WHERE id = $1 + `, eventID, attemptCount, nextAttemptAt, lastError) + return err +} + +func (s *PlatformEventStore) MarkDeadLetter(ctx context.Context, eventID string, attemptCount int, finalError string) error { + if s.db == nil { + return fmt.Errorf("db is nil") + } + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'dead_letter', attempt_count = $2, last_error = NULLIF($3,''), updated_at = NOW() + WHERE id = $1 + `, eventID, attemptCount, finalError); err != nil { + _ = tx.Rollback() + return err + } + if _, err := tx.ExecContext(ctx, ` + INSERT INTO cs_platform_event_dead_letters(event_id, platform, event_type, callback_target, payload, attempt_count, final_error) + SELECT id, platform, event_type, callback_target, payload, attempt_count, last_error + FROM cs_platform_event_outbox + WHERE id = $1 + ON CONFLICT (event_id) DO UPDATE + SET attempt_count = EXCLUDED.attempt_count, final_error = EXCLUDED.final_error, payload = EXCLUDED.payload + `, eventID); err != nil { + _ = tx.Rollback() + return err + } + return tx.Commit() +} diff --git a/internal/store/postgres/platform_event_store_test.go b/internal/store/postgres/platform_event_store_test.go new file mode 100644 index 0000000..828dede --- /dev/null +++ b/internal/store/postgres/platform_event_store_test.go @@ -0,0 +1,212 @@ +package postgres + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/bridge/ai-customer-service/internal/domain/platformevent" +) + +func TestPlatformEventStore_ShouldInsertPendingEvent(t *testing.T) { + db := openDBForTest(t) + defer db.Close() + + store := NewPlatformEventStore(db) + now := time.Now().UTC().Truncate(time.Second) + event := &platformevent.Event{ + ID: uniqueID("evt"), + Platform: "sub2api", + EventType: platformevent.TypeMessageReceived, + CallbackTarget: "default", + Payload: map[string]any{"message": "hello"}, + Status: platformevent.StatusPending, + AttemptCount: 0, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + SourceMessageID: uniqueID("msg"), + } + + if err := store.InsertPending(context.Background(), event); err != nil { + t.Fatalf("InsertPending() error = %v", err) + } + + var ( + status string + callbackName string + ) + if err := db.QueryRowContext(context.Background(), ` + SELECT status, callback_target + FROM cs_platform_event_outbox + WHERE id = $1 + `, event.ID).Scan(&status, &callbackName); err != nil { + t.Fatalf("query inserted event failed: %v", err) + } + if status != string(platformevent.StatusPending) { + t.Fatalf("status = %s, want %s", status, platformevent.StatusPending) + } + if callbackName != "default" { + t.Fatalf("callback target = %s, want default", callbackName) + } +} + +func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) { + db := openDBForTest(t) + defer db.Close() + + store := NewPlatformEventStore(db) + now := time.Now().UTC().Truncate(time.Second) + firstID := uniqueID("evt") + secondID := uniqueID("evt") + platformName := "s2a-" + firstID[:8] + + first := &platformevent.Event{ + ID: firstID, + Platform: platformName, + EventType: platformevent.TypeMessageProcessing, + CallbackTarget: "default", + Payload: map[string]any{"step": 1}, + Status: platformevent.StatusPending, + NextAttemptAt: now.Add(-2 * time.Minute), + OccurredAt: now.Add(-2 * time.Minute), + CreatedAt: now.Add(-2 * time.Minute), + UpdatedAt: now.Add(-2 * time.Minute), + } + second := &platformevent.Event{ + ID: secondID, + Platform: platformName, + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"step": 2}, + Status: platformevent.StatusPending, + NextAttemptAt: now.Add(-1 * time.Minute), + OccurredAt: now.Add(-1 * time.Minute), + CreatedAt: now.Add(-1 * time.Minute), + UpdatedAt: now.Add(-1 * time.Minute), + } + + if err := store.InsertPending(context.Background(), second); err != nil { + t.Fatalf("InsertPending(second) error = %v", err) + } + if err := store.InsertPending(context.Background(), first); err != nil { + t.Fatalf("InsertPending(first) error = %v", err) + } + + events, err := store.ListDue(context.Background(), platformName, now, 10) + if err != nil { + t.Fatalf("ListDue() error = %v", err) + } + if len(events) < 2 { + t.Fatalf("due events count = %d, want at least 2", len(events)) + } + + firstPos := -1 + secondPos := -1 + for i, event := range events { + if event.ID == firstID { + firstPos = i + } + if event.ID == secondID { + secondPos = i + } + } + if firstPos == -1 || secondPos == -1 { + t.Fatalf("did not find inserted events in due list: first=%d second=%d", firstPos, secondPos) + } + if firstPos >= secondPos { + t.Fatalf("event order invalid: firstPos=%d secondPos=%d", firstPos, secondPos) + } +} + +func TestPlatformEventStore_ShouldPersistDeliveryAttemptAudit(t *testing.T) { + db := openDBForTest(t) + defer db.Close() + + store := NewPlatformEventStore(db) + now := time.Now().UTC().Truncate(time.Second) + event := &platformevent.Event{ + ID: uniqueID("evt"), + Platform: "s2a-" + uniqueID("plt")[:8], + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "好的"}, + Status: platformevent.StatusPending, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + } + + if err := store.InsertPending(context.Background(), event); err != nil { + t.Fatalf("InsertPending() error = %v", err) + } + if err := store.RecordDeliveryAttempt(context.Background(), event.ID, 1, http.StatusBadGateway, `{"error":"upstream"}`, ""); err != nil { + t.Fatalf("RecordDeliveryAttempt() error = %v", err) + } + + var ( + attemptNo int + responseStatus int + ) + if err := db.QueryRowContext(context.Background(), ` + SELECT attempt_no, response_status + FROM cs_platform_event_delivery_attempts + WHERE event_id = $1 + ORDER BY created_at DESC + LIMIT 1 + `, event.ID).Scan(&attemptNo, &responseStatus); err != nil { + t.Fatalf("query delivery attempt failed: %v", err) + } + if attemptNo != 1 { + t.Fatalf("attempt no = %d, want 1", attemptNo) + } + if responseStatus != http.StatusBadGateway { + t.Fatalf("response status = %d, want %d", responseStatus, http.StatusBadGateway) + } +} + +func TestPlatformEventStore_ShouldMoveToDeadLetter(t *testing.T) { + db := openDBForTest(t) + defer db.Close() + + store := NewPlatformEventStore(db) + now := time.Now().UTC().Truncate(time.Second) + event := &platformevent.Event{ + ID: uniqueID("evt"), + Platform: "s2a-" + uniqueID("plt")[:8], + EventType: platformevent.TypeReplyGenerated, + CallbackTarget: "default", + Payload: map[string]any{"reply": "失败"}, + Status: platformevent.StatusPending, + NextAttemptAt: now, + OccurredAt: now, + CreatedAt: now, + UpdatedAt: now, + } + + if err := store.InsertPending(context.Background(), event); err != nil { + t.Fatalf("InsertPending() error = %v", err) + } + if err := store.MarkDeadLetter(context.Background(), event.ID, 5, "callback failed"); err != nil { + t.Fatalf("MarkDeadLetter() error = %v", err) + } + + var status string + if err := db.QueryRowContext(context.Background(), `SELECT status FROM cs_platform_event_outbox WHERE id = $1`, event.ID).Scan(&status); err != nil { + t.Fatalf("query outbox status failed: %v", err) + } + if status != string(platformevent.StatusDeadLetter) { + t.Fatalf("status = %s, want %s", status, platformevent.StatusDeadLetter) + } + + var finalError string + if err := db.QueryRowContext(context.Background(), `SELECT final_error FROM cs_platform_event_dead_letters WHERE event_id = $1`, event.ID).Scan(&finalError); err != nil { + t.Fatalf("query dead letter failed: %v", err) + } + if finalError != "callback failed" { + t.Fatalf("final error = %s, want callback failed", finalError) + } +} diff --git a/internal/store/postgres/store_test.go b/internal/store/postgres/store_test.go index 68cc669..7495022 100644 --- a/internal/store/postgres/store_test.go +++ b/internal/store/postgres/store_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "database/sql" "encoding/hex" + "path/filepath" "testing" "time" @@ -40,6 +41,10 @@ func openDBForTest(t *testing.T) *sql.DB { if err != nil { t.Fatalf("failed to open DB: %v", err) } + if err := RunMigrations(db, filepath.Join("..", "..", "..", "db", "migration")); err != nil { + _ = db.Close() + t.Fatalf("failed to run migrations: %v", err) + } return db } @@ -70,7 +75,7 @@ func TestTicketStore_CreateAndGet(t *testing.T) { AssignedTo: "agent-001", ContextSnapshot: map[string]any{"key": "value"}, CreatedAt: now, - UpdatedAt: now, + UpdatedAt: now, } if err := ticketStore.Create(ctx, tkt); err != nil { diff --git a/tech/TEST_DESIGN.md b/tech/TEST_DESIGN.md index c49afdf..1116759 100644 --- a/tech/TEST_DESIGN.md +++ b/tech/TEST_DESIGN.md @@ -318,6 +318,12 @@ TEC-01 ~ TEC-10(全 10 条) ### 7.4 NewAPI / Sub2API 适配层验证 +当前实现状态: +- **Sub2API 最小接入链路已落地并有自动化覆盖** + - `test/integration/sub2api_webhook_flow_test.go` + - `test/e2e/sub2api_callback_flow_test.go` +- **NewAPI 仍为同构占位,未启用真实 profile** + | 用例 ID | 描述 | 类型 | 验证条件 | |---------|------|------|---------| | TCS-ADP-01 | Webhook 转发适配 | Integration | Given NewAPI/Sub2API 按标准 Webhook 推送消息 When 适配层处理 Then 消息被正确转换为 `UnifiedMessage` 并进入主链路 | @@ -341,6 +347,12 @@ TEC-01 ~ TEC-10(全 10 条) - [ ] Prompt Injection、越权访问、适配层限流/熔断三类高风险测试全部通过 - [ ] 至少一条主路径、一条关键失败路径、一条集成模式链路完成真实验证 +适配层当前已完成的自动化闭环: +- [x] Sub2API 入站签名 + 主链处理 + outbox 入库 +- [x] Sub2API callback 成功投递顺序验证 +- [x] Sub2API callback 死信路径验证 +- [ ] NewAPI profile 实现与验证 + ### 8.2 阶段门控结论 **当前结论:REQUEST_CHANGES** diff --git a/test/QA_GATE_STATUS.md b/test/QA_GATE_STATUS.md index 7e8f8ce..a085ad4 100644 --- a/test/QA_GATE_STATUS.md +++ b/test/QA_GATE_STATUS.md @@ -43,6 +43,7 @@ - `prd/PRODUCTION_CHECKLIST.md` - `docs/CONFIG_CONTRACT_BASELINE.md` - `docs/P0_P1_P2_RECTIFICATION_EXECUTION_BOARD.md` +- `docs/RUNBOOK_PLATFORM_CALLBACKS.md` ### 1.3 本轮已执行验证 ```bash @@ -53,6 +54,10 @@ AI_CS_RUNTIME_ENV=production ... scripts/verify_preprod_gate_b.sh AI_CS_RUNTIME_ENV=production ... scripts/verify_gate_c_rollback.sh ``` +适配层新增实测: +- `go test ./test/integration ./test/e2e -count=1` +- 覆盖 `Sub2API` 平台入口、outbox、callback 成功投递、callback 死信路径 + ### 1.4 关键事实校准 - 当前仓库实测结论:**全量 Go 测试与 `go vet` 已通过** - prod fallback / runtime env / readiness 相关代码阻断:**已落地并有测试覆盖** @@ -76,6 +81,7 @@ AI_CS_RUNTIME_ENV=production ... scripts/verify_gate_c_rollback.sh ### 2.1 已通过项 - webhook / dialog / handoff / ticket 主链已落地 +- `Sub2API` 平台适配入口、outbox、callback worker、死信链路已落地并有自动化覆盖 - feedback / handoff / stats 等 Phase 1 核心接口已具备 - Webhook HMAC / timestamp / dedup / body limit / rate limit 已存在 - Postgres 持久化链路已接通 diff --git a/test/e2e/sub2api_callback_flow_test.go b/test/e2e/sub2api_callback_flow_test.go new file mode 100644 index 0000000..1460af8 --- /dev/null +++ b/test/e2e/sub2api_callback_flow_test.go @@ -0,0 +1,266 @@ +package e2e + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/bridge/ai-customer-service/internal/app" + "github.com/bridge/ai-customer-service/internal/config" + "github.com/bridge/ai-customer-service/internal/domain/platformevent" + "github.com/bridge/ai-customer-service/internal/http/handlers" + "github.com/bridge/ai-customer-service/internal/platform/logging" + pgstore "github.com/bridge/ai-customer-service/internal/store/postgres" +) + +func e2ePlatformDSN() string { + return "host=localhost port=5434 user=ai_cs password=ai_cs_secret dbname=ai_customer_service sslmode=disable" +} + +func openE2EPlatformDB(t *testing.T) *sql.DB { + t.Helper() + db, err := pgstore.Open(pgstore.Config{ + DSN: e2ePlatformDSN(), + MaxOpenConns: 5, + MaxIdleConns: 2, + ConnMaxLifetime: 30 * time.Second, + }) + if err != nil { + t.Fatalf("open postgres failed: %v", err) + } + if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil { + _ = db.Close() + t.Fatalf("run migrations failed: %v", err) + } + return db +} + +func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App { + t.Helper() + cfg := &config.Config{} + cfg.HTTP.Addr = ":0" + cfg.HTTP.ReadHeaderTimeout = 5 + cfg.HTTP.ReadTimeout = 10 + cfg.HTTP.WriteTimeout = 15 + cfg.HTTP.IdleTimeout = 60 + cfg.HTTP.MaxHeaderBytes = 1 << 20 + cfg.HTTP.MaxBodyBytes = 1 << 20 + cfg.Runtime.Env = "test" + cfg.Postgres.Enabled = true + cfg.Postgres.DSN = e2ePlatformDSN() + cfg.Postgres.MigrationDir = "../../db/migration" + cfg.Postgres.MaxOpenConns = 5 + cfg.Postgres.MaxIdleConns = 2 + cfg.Postgres.ConnMaxLifetime = 30 + cfg.Webhook.Secret = "default-webhook-secret" + cfg.Webhook.TimestampHeader = "X-CS-Timestamp" + cfg.Webhook.SignatureHeader = "X-CS-Signature" + cfg.Webhook.MaxSkewSeconds = 300 + cfg.PlatformAdapters.Enabled = true + cfg.PlatformAdapters.Sub2API.Enabled = true + cfg.PlatformAdapters.Sub2API.IngressSecret = "sub2api-ingress-secret" + cfg.PlatformAdapters.Sub2API.CallbackBaseURL = callbackURL + cfg.PlatformAdapters.Sub2API.CallbackSecret = callbackSecret + cfg.PlatformAdapters.Sub2API.CallbackTimeoutMS = 2000 + cfg.PlatformAdapters.Sub2API.CallbackMaxRetries = maxRetries + + application, err := app.New(cfg, logging.New()) + if err != nil { + t.Fatalf("app.New() error = %v", err) + } + t.Cleanup(func() { + _ = application.Shutdown(context.Background()) + }) + return application +} + +func eventually(t *testing.T, timeout time.Duration, fn func() bool) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if fn() { + return + } + time.Sleep(200 * time.Millisecond) + } + t.Fatal("condition not satisfied before timeout") +} + +func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) { + db := openE2EPlatformDB(t) + defer db.Close() + + var ( + mu sync.Mutex + received []platformevent.Event + ) + callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + var event platformevent.Event + if err := json.NewDecoder(r.Body).Decode(&event); err != nil { + t.Fatalf("decode callback body failed: %v", err) + } + received = append(received, event) + w.WriteHeader(http.StatusOK) + })) + defer callbackServer.Close() + + application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 3) + server := httptest.NewServer(application.Server.Handler) + defer server.Close() + + openID := "sub2api-e2e-" + time.Now().UTC().Format("150405.000000000") + payload := map[string]any{ + "message_id": "m-e2e-" + time.Now().UTC().Format("150405.000000000"), + "channel": "sub2api", + "open_id": openID, + "content": "我要退款", + } + body, _ := json.Marshal(payload) + timestamp, signature, err := handlers.SignWebhookRequest("sub2api-ingress-secret", time.Now().Unix(), body) + if err != nil { + t.Fatalf("SignWebhookRequest() error = %v", err) + } + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request error = %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-CS-Timestamp", timestamp) + req.Header.Set("X-CS-Signature", signature) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do request error = %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + var ack map[string]any + if err := json.NewDecoder(resp.Body).Decode(&ack); err != nil { + t.Fatalf("decode ack failed: %v", err) + } + sessionID, _ := ack["session_id"].(string) + if sessionID == "" { + t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"]) + } + + eventually(t, 8*time.Second, func() bool { + mu.Lock() + defer mu.Unlock() + count := 0 + for _, event := range received { + if event.SessionID == sessionID { + count++ + } + } + return count == 6 + }) + + mu.Lock() + defer mu.Unlock() + filtered := make([]platformevent.Event, 0, 6) + for _, event := range received { + if event.SessionID == sessionID { + filtered = append(filtered, event) + } + } + wantTypes := []string{ + platformevent.TypeMessageReceived, + platformevent.TypeMessageProcessing, + platformevent.TypeIntentResolved, + platformevent.TypeHandoffTriggered, + platformevent.TypeTicketCreated, + platformevent.TypeReplyGenerated, + } + seenIDs := make(map[string]struct{}, len(filtered)) + for i, event := range filtered { + if event.EventType != wantTypes[i] { + t.Fatalf("event[%d].type = %s, want %s", i, event.EventType, wantTypes[i]) + } + if event.ID == "" { + t.Fatalf("event[%d] id is empty", i) + } + if _, exists := seenIDs[event.ID]; exists { + t.Fatalf("duplicate event id: %s", event.ID) + } + seenIDs[event.ID] = struct{}{} + } + + var deliveredCount int + if err := db.QueryRowContext(context.Background(), ` + SELECT COUNT(1) + FROM cs_platform_event_outbox + WHERE platform = 'sub2api' AND status = 'delivered' AND session_id IN ( + SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1 + ) + `, openID).Scan(&deliveredCount); err != nil { + t.Fatalf("query delivered count failed: %v", err) + } + if deliveredCount != 6 { + t.Fatalf("delivered count = %d, want 6", deliveredCount) + } +} + +func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) { + db := openE2EPlatformDB(t) + defer db.Close() + + callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte(`{"error":"upstream"}`)) + })) + defer callbackServer.Close() + + application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 1) + server := httptest.NewServer(application.Server.Handler) + defer server.Close() + + openID := "sub2api-dead-" + time.Now().UTC().Format("150405.000000000") + payload := map[string]any{ + "message_id": "m-dead-" + time.Now().UTC().Format("150405.000000000"), + "channel": "sub2api", + "open_id": openID, + "content": "晚上好", + } + body, _ := json.Marshal(payload) + timestamp, signature, err := handlers.SignWebhookRequest("sub2api-ingress-secret", time.Now().Unix(), body) + if err != nil { + t.Fatalf("SignWebhookRequest() error = %v", err) + } + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request error = %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-CS-Timestamp", timestamp) + req.Header.Set("X-CS-Signature", signature) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do request error = %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + + eventually(t, 8*time.Second, func() bool { + var deadCount int + err := db.QueryRowContext(context.Background(), ` + SELECT COUNT(1) + FROM cs_platform_event_dead_letters dl + JOIN cs_platform_event_outbox o ON o.id = dl.event_id + WHERE o.platform = 'sub2api' AND o.session_id IN ( + SELECT id FROM cs_sessions WHERE channel = 'sub2api' AND open_id = $1 + ) + `, openID).Scan(&deadCount) + return err == nil && deadCount == 4 + }) +} diff --git a/test/integration/sub2api_webhook_flow_test.go b/test/integration/sub2api_webhook_flow_test.go new file mode 100644 index 0000000..42e2904 --- /dev/null +++ b/test/integration/sub2api_webhook_flow_test.go @@ -0,0 +1,162 @@ +package integration + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/bridge/ai-customer-service/internal/app" + "github.com/bridge/ai-customer-service/internal/config" + "github.com/bridge/ai-customer-service/internal/http/handlers" + "github.com/bridge/ai-customer-service/internal/platform/logging" + pgstore "github.com/bridge/ai-customer-service/internal/store/postgres" +) + +func platformTestDSN() string { + return "host=localhost port=5434 user=ai_cs password=ai_cs_secret dbname=ai_customer_service sslmode=disable" +} + +func openPlatformTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := pgstore.Open(pgstore.Config{ + DSN: platformTestDSN(), + MaxOpenConns: 5, + MaxIdleConns: 2, + ConnMaxLifetime: 30 * time.Second, + }) + if err != nil { + t.Fatalf("open postgres failed: %v", err) + } + if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil { + _ = db.Close() + t.Fatalf("run migrations failed: %v", err) + } + return db +} + +func newSub2APIIntegrationApp(t *testing.T) *app.App { + t.Helper() + cfg := &config.Config{} + cfg.HTTP.Addr = ":0" + cfg.HTTP.ReadHeaderTimeout = 5 + cfg.HTTP.ReadTimeout = 10 + cfg.HTTP.WriteTimeout = 15 + cfg.HTTP.IdleTimeout = 60 + cfg.HTTP.MaxHeaderBytes = 1 << 20 + cfg.HTTP.MaxBodyBytes = 1 << 20 + cfg.Runtime.Env = "test" + cfg.Postgres.Enabled = true + cfg.Postgres.DSN = platformTestDSN() + cfg.Postgres.MigrationDir = "../../db/migration" + cfg.Postgres.MaxOpenConns = 5 + cfg.Postgres.MaxIdleConns = 2 + cfg.Postgres.ConnMaxLifetime = 30 + cfg.Webhook.Secret = "default-webhook-secret" + cfg.Webhook.TimestampHeader = "X-CS-Timestamp" + cfg.Webhook.SignatureHeader = "X-CS-Signature" + cfg.Webhook.MaxSkewSeconds = 300 + cfg.PlatformAdapters.Enabled = true + cfg.PlatformAdapters.Sub2API.Enabled = true + cfg.PlatformAdapters.Sub2API.IngressSecret = "sub2api-ingress-secret" + + application, err := app.New(cfg, logging.New()) + if err != nil { + t.Fatalf("app.New() error = %v", err) + } + t.Cleanup(func() { + _ = application.Shutdown(context.Background()) + }) + return application +} + +func TestSub2APIWebhookFlow_ShouldCreateSessionTicketAndOutboxEvents(t *testing.T) { + db := openPlatformTestDB(t) + defer db.Close() + + application := newSub2APIIntegrationApp(t) + server := httptest.NewServer(application.Server.Handler) + defer server.Close() + + openID := "sub2api-intg-" + time.Now().UTC().Format("150405.000000000") + payload := map[string]any{ + "message_id": "m-intg-" + time.Now().UTC().Format("150405.000000000"), + "channel": "sub2api", + "open_id": openID, + "content": "我要退款", + } + body, _ := json.Marshal(payload) + timestamp, signature, err := handlers.SignWebhookRequest("sub2api-ingress-secret", time.Now().Unix(), body) + if err != nil { + t.Fatalf("SignWebhookRequest() error = %v", err) + } + + req, err := http.NewRequest(http.MethodPost, server.URL+"/api/v1/customer-service/platforms/sub2api/webhook", bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request error = %v", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-CS-Timestamp", timestamp) + req.Header.Set("X-CS-Signature", signature) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do request error = %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + + var ack map[string]any + if err := json.NewDecoder(resp.Body).Decode(&ack); err != nil { + t.Fatalf("decode ack error = %v", err) + } + sessionID, _ := ack["session_id"].(string) + ticketID, _ := ack["ticket_id"].(string) + if sessionID == "" || ticketID == "" { + t.Fatalf("ack session_id=%v ticket_id=%v, want both non-empty", ack["session_id"], ack["ticket_id"]) + } + + var storedSessionID string + if err := db.QueryRowContext(context.Background(), ` + SELECT id + FROM cs_sessions + WHERE channel = 'sub2api' AND open_id = $1 + ORDER BY created_at DESC + LIMIT 1 + `, openID).Scan(&storedSessionID); err != nil { + t.Fatalf("query session failed: %v", err) + } + if storedSessionID != sessionID { + t.Fatalf("stored session id = %s, want %s", storedSessionID, sessionID) + } + + var storedTicketID string + if err := db.QueryRowContext(context.Background(), ` + SELECT id + FROM cs_tickets + WHERE id = $1 AND session_id = $2 + `, ticketID, sessionID).Scan(&storedTicketID); err != nil { + t.Fatalf("query ticket failed: %v", err) + } + if storedTicketID != ticketID { + t.Fatalf("stored ticket id = %s, want %s", storedTicketID, ticketID) + } + + var outboxCount int + if err := db.QueryRowContext(context.Background(), ` + SELECT COUNT(1) + FROM cs_platform_event_outbox + WHERE session_id = $1 AND platform = 'sub2api' + `, sessionID).Scan(&outboxCount); err != nil { + t.Fatalf("query outbox count failed: %v", err) + } + if outboxCount != 6 { + t.Fatalf("outbox count = %d, want 6", outboxCount) + } +}