news 2026/5/13 3:03:38

事件写进去了但查不到?CQRS 投影层的坑我都替你踩了

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
事件写进去了但查不到?CQRS 投影层的坑我都替你踩了

一、缘起:事件写进去了,列表查不到

事情是这样的。

给订单模块上了事件溯源之后,第一版跑得挺顺利——创建订单、改状态、取消订单,事件流一条条往 PostgreSQL 里写,Decision 模式的测试全绿。

然后测试同事过来说:“你过来看看,创建完订单,点进列表是空的。”

我:不可能吧,事件明明写成功了。

打开数据库一看,event表里OrderCreated事件躺在那里,无事发生。再查orders投影表——空的。过了大概半秒钟,订单突然出现了。

那一刻我才真正理解:事件溯源里,事件写入 != 数据可查。写事件的那一步和更新读模型的那一步,中间隔着一个异步的投影层。

提前声明:本文分享的是个人在 Pico-CRM 项目中的实践踩坑,架构选择有上下文依赖,仅供参考。

二、概念:投影层到底在干什么

先一句话说清投影层是干什么的:

事件存储里只有事件序列,没有"当前订单"这个概念。投影层负责把事件流"折叠"成一行行的查询表,让前端能SELECT * FROM orders

举个例子:一个订单经历了 5 个事件——

OrderCreated → OrderStatusChanged(confirmed) → OrderAssignmentUpdated → OrderStatusChanged(in_service) → OrderCompleted

前端不需要知道这 5 个事件,只需要知道"这个订单现在状态是已完成"。投影层的职责就是消费这 5 个事件,把orders表里那一行的status依次更新成completed

问题就在于:写事件和更新投影是异步的,于是有了开头那个"创完订单查不到"的名场面。

三、架构:双库分离,各管各的

Pico-CRM 的 CQRS 架构很直接:

┌─────────────┐ ┌──────────────┐ │ 命令端 │ │ 查询端 │ │ (写事件) │ │ (查投影表) │ └──────┬──────┘ └──────▲───────┘ │ │ ▼ │ ┌──────────────┐ ┌────────────────┐ │ 事件存储 DB │────▶│ 读模型 DB │ │ (ES_DATABASE │ 投影 │ (DATABASE_URL) │ │ _URL) │ │ │ └──────────────┘ └────────────────┘

两个 PostgreSQL 数据库,通过环境变量区分:

// backend/src/infrastructure/event_store/mod.rspub(crate)asyncfnevent_store_pool()->Result<sqlx::PgPool,String>{EVENT_STORE_POOL.get_or_try_init(||async{letdatabase_url=env::var("ES_DATABASE_URL")// 事件存储.map_err(|e|format!("load ES_DATABASE_URL error: {}",e))?;sqlx::PgPool::connect(&database_url).await.map_err(|e|format!("connect event store sqlx pool error: {}",e))}).await.cloned()}

命令端(Repository)只写事件存储,查询端(Handler)只查读模型。中间的投影器异步消费事件,更新读模型表。这个架构的关键问题是:投影器的可靠性决定数据一致性的上限

四、投影实现:三个核心设计

4.1 event_id 幂等守卫

每个投影表都有一个event_id: i64字段,记录当前行是由哪个事件 ID 更新的。投影器处理事件时,先查现有行,如果model.event_id >= 当前事件 ID,直接跳过。

举个例子,订单状态变更的投影处理:

// backend/src/infrastructure/projections/crm/order_projection.rsOrderEventEnvelope::OrderStatusChanged{merchant_id,order_uuid,status,completed_at,updated_at,..}=>{// 查现有投影行letSome(model)=orders::Entity::find().filter(orders::Column::Uuid.eq(order_uuid)).one(txn).await?else{returnOk(());};// 幂等守卫:已处理过更高版本的事件,跳过ifmodel.event_id>=event_id{returnOk(());}// 记录变更前的快照letbefore=snapshot_order_model(&model);letmutactive=model.into_active_model();active.status=Set(status);active.completed_at=Set(completed_at);active.event_id=Set(event_id);// 更新到最新事件 IDletupdated=active.update(txn).await?;// 写审计日志(before/after)insert_change_log(txn,merchant_uuid,updated.uuid,"status_changed",operator_uuid,Some(before),Some(snapshot_order_model(&updated)),updated_at,).await?;}

这个if model.event_id >= event_id { return Ok(()); }是投影层最后的安全网。就算同一条事件被投影器重复消费(进程重启、重试),第二次直接被拦截,不会把数据写乱。

4.2 审计日志:每次变更存 before/after 快照

注意上面代码里的insert_change_log。每个事件处理完后,投影器会往order_change_logs表里插一条记录,带着变更前后的 JSON 快照:

fnsnapshot_order_model(model:&orders::Model)->Value{serde_json::json!({"uuid":model.uuid.to_string(),"status":model.status,"amount_cents":model.amount_cents,"paid_amount_cents":model.paid_amount_cents,"settlement_status":model.settlement_status,"completed_at":model.completed_at,// ... 所有字段})}

这带来了一个意外收获:不需要在业务代码里手动写 changelog。每次事件被投影消费,审计日志跟着投影一起生成。出纠纷的时候直接查order_change_logs,订单的每一次变更、变更后什么值、变更前什么值、谁操作的、精确到秒——全在。

4.3 六个事件类型,同一个处理模式

Order 聚合有 6 个事件(Created、DetailsUpdated、StatusChanged、Cancelled、AssignmentUpdated、SettlementUpdated),投影器对每个事件都是一样的处理流程:

  • Created → 插入新行(检查是否已存在)
  • 其他事件 → 查现有行 → event_id 守卫 → 更新字段 → 写 changelog

三个聚合(Order、Schedule、ServiceRequest)各有一个投影器,三个PgEventListener各自跑在独立的 tokio 任务里,互不干扰。

五、踩坑一:到底什么时候投影才完成?

回到开头那个"创完订单查不到"的问题。

根本原因是:用户操作 → 写事件 → HTTP 响应返回 → 投影器轮询到新事件 → 更新投影表,这个链路里有 250ms 的轮询间隔。如果用户手速够快,在轮询间隙点进详情页,就查不到。

有三个层面的应对:

5.1 250ms 轮询 + PG NOTIFY 打断

// backend/src/infrastructure/projections/crm/order_projection.rsPgEventListenerConfig::poller(Duration::from_millis(250))// 每 250ms 轮询.with_notifier()// 同时监听 PG NOTIFY,新事件写入后立即唤醒

poller(250ms)保证最差情况下延迟不超过 250ms。with_notifier()利用disintegrate_postgresLISTEN/NOTIFY机制——事件写入后发一个 NOTIFY,投影器收到后立即醒来处理,不用等满 250ms。实际延迟通常在几十毫秒以内。

5.2 前端乐观更新

对于创建订单这种操作,前端在拿到 HTTP 200 后直接用请求参数在前端列表里补上一行,不等后端投影完成。这个不涉及后端代码,但它是整个一致性体验的关键一环。

5.3 接受最终一致性

说实话,到目前为止没有碰到因为 250ms 延迟导致的业务事故。家政 CRM 不是交易系统,250ms 的短暂不一致在用户体验上完全可以接受(尤其是配合了前端乐观更新之后)。为了这 250ms 去上 Outbox 或者把投影同步阻塞,性价比太低。

六、踩坑二:投影器挂了怎么办?

投影器是后台任务,理论上可能因为各种原因挂掉——数据库连接断开、事件格式异常、panic 等等。挂掉之后的投影堆积会导致读模型越来越旧,甚至跟事件存储彻底脱节。

6.1 指数退避重试

// backend/src/infrastructure/projections/crm/mod.rspub(crate)fnprojection_listener_retry<HE:Debug>(label:&str,err:PgEventListenerError<HE>,attempts:usize,)->RetryAction{letbackoff_ms=(200_u64*2_u64.pow(attempts.min(5)asu32)).min(5_000);ifattempts>=10{eprintln!("{} projection listener aborted after repeated errors: {:?}",label,err);returnRetryAction::Abort;}eprintln!("{} retrying after transient error (attempt {}): {:?}",label,attempts+1,err);RetryAction::Wait{duration:Duration::from_millis(backoff_ms),}}

重试策略:

  • 起始等待 200ms
  • 每次翻倍,最多翻 5 次后封顶在 5000ms
  • 最多重试 10 次,超过后放弃(打日志退出)
  • 进程重启后从listener_progress表恢复,继续从上次处理到的事件 ID 开始

6.2 进程重启自动恢复

disintegrate_postgres内部维护了一张listener_progress表,记录每个投影器处理到的最后一个事件 ID。进程重启后,投影器从这个 ID 继续消费,不会丢事件,也不会重复处理(因为 event_id 幂等守卫兜底)。

这里有个细节值得一提:不需要自己做 checkpointlistener_progress是 disintegrate 框架内部管理的,投影器处理完一批事件后自动更新。这对开发体验很好——不需要关心进度持久化,框架帮你做了。

七、踩坑三:多实例同时跑怎么办?

如果一个应用部署了多个实例(比如负载均衡),每个实例都启动投影器,就会同一事件被多个投影器处理。虽然 event_id 幂等守卫能保证数据不写乱,但浪费数据库连接、增加无意义的竞争。

解决方案:PG Advisory Lock 选主

// backend/src/infrastructure/event_store/mod.rsconstPROJECTION_LEADER_LOCK_KEY:i64=0x5049_434f_4351_5253;// "PICOQRS" 的 ASCIIpubasyncfnhold_projection_leader_lock()->Result<bool,String>{letpool=event_store_pool().await?;letmutconn=pool.acquire().await?;letacquired:bool=sqlx::query_scalar("SELECT pg_try_advisory_lock($1)").bind(PROJECTION_LEADER_LOCK_KEY).fetch_one(&mut*conn).await?;if!acquired{returnOk(false);// 其他实例已持锁,本实例不启动投影器}// 把连接塞进后台任务永久持有,直到进程退出tokio::spawn(asyncmove{let_projection_lock_conn=conn;pending::<()>().await;// 永不返回});Ok(true)}

pg_try_advisory_lock是非阻塞的——拿不到锁直接返回 false,不会排队等。拿到锁的实例成为"投影 Leader",独占投影器;没拿到的实例跳过投影器启动,只提供 HTTP 服务。

如果 Leader 挂了,数据库连接断开,PG 自动释放 Advisory Lock,其他实例重启后就能抢到锁接管投影工作。不需要引入 etcd、consul 或者 Redis 分布式锁——一个 PG Advisory Lock 就够了

bootstrap_cqrs把这个串起来:

// backend/src/infrastructure.rspubasyncfnbootstrap_cqrs(read_model_db:DatabaseConnection)->Result<(),String>{event_store::initialize().await?;// ① 初始化事件表结构if!event_store::hold_projection_leader_lock().await?{// ② 抢锁eprintln!("projection leader lock is already held; skipping listener startup");returnOk(());}projections::spawn_all_listeners(read_model_db).await?;// ③ 启动三个投影器Ok(())}

main.rs里就一行:

bootstrap_cqrs(db.connection.clone()).await.unwrap_or_else(|err|panic!("启动 CQRS 基础设施失败: {}",err));

八、总结

回头看,Pico-CRM 的投影层设计其实就六个要点:

  1. 双库分离:事件存储和读模型各用一个 DB,命令端和查询端独立部署
  2. event_id 幂等守卫if model.event_id >= event_id一行代码挡住所有重复消费
  3. 250ms 轮询 + PG NOTIFY:可预测的最差延迟 + 主动唤醒减少平均延迟
  4. 指数退避重试:200ms 起翻倍,10 次上限,进程重启自动恢复
  5. PG Advisory Lock 选主:零外部依赖的多实例互斥,Leader 挂了自动换人
  6. 审计日志随投影生成:before/after 快照存入order_change_logs,不需要单独维护

CQRS 投影层做的是"翻译"工作——把事件流翻译成业务可读的查询表。翻译得好不好,直接决定用户看到的数据准不准。这几个设计都不算高深,但每一个都对应着一个真实踩过的坑。

如果你也在用事件溯源或在 CQRS 投影层踩过不一样的坑,欢迎在评论区聊聊你的方案。你觉得 PG Advisory Lock 选主靠谱吗?还是你会选 Redis 分布式锁?


项目开源在 GitHub,搜 Pico-CRM 就能找到完整代码。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/13 3:03:12

ImageGlass:Windows平台最强图像浏览器,90+格式全支持

ImageGlass&#xff1a;Windows平台最强图像浏览器&#xff0c;90格式全支持 【免费下载链接】ImageGlass &#x1f3de; A lightweight, versatile image viewer 项目地址: https://gitcode.com/gh_mirrors/im/ImageGlass 你是否曾因Windows自带照片应用无法打开专业RA…

作者头像 李华
网站建设 2026/5/13 3:02:00

抖音下载完整指南:如何免费快速保存无水印视频

抖音下载完整指南&#xff1a;如何免费快速保存无水印视频 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. 抖音…

作者头像 李华
网站建设 2026/5/13 3:01:56

多智能体任务网络设计:优先级调度与动态约束实践

1. 多智能体任务网络的核心设计原理任务网络&#xff08;Task Network&#xff09;作为分布式自主系统的中枢神经&#xff0c;其设计直接决定了多智能体协同的效率和可靠性。在CADRE月球探测任务中&#xff0c;我们构建了一套基于优先级调度和动态约束的任务网络系统&#xff0…

作者头像 李华
网站建设 2026/5/13 3:01:24

DelphiOpenAI:原生集成OpenAI API,赋能Delphi开发者构建智能应用

1. 项目概述&#xff1a;DelphiOpenAI&#xff0c;一个为Delphi开发者打造的AI桥梁如果你是一名Delphi开发者&#xff0c;看着Python、JavaScript社区热火朝天地集成各种AI能力&#xff0c;自己却苦于没有成熟、好用的原生库&#xff0c;只能望“AI”兴叹&#xff0c;那么今天介…

作者头像 李华
网站建设 2026/5/13 3:01:03

开源跨平台B站客户端PiliPlus:5分钟掌握全平台免费观影体验

开源跨平台B站客户端PiliPlus&#xff1a;5分钟掌握全平台免费观影体验 【免费下载链接】PiliPlus PiliPlus 项目地址: https://gitcode.com/gh_mirrors/pi/PiliPlus 你是否厌倦了官方B站客户端的广告和功能臃肿&#xff1f;是否希望在电脑、手机、平板等多个设备上获得…

作者头像 李华