news 2026/4/16 13:09:36

RocketMQ Hook 实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RocketMQ Hook 实现

1. 前言

在分布式系统中,RocketMQ 不仅仅是一个消息传输管道,它还提供了强大的插件化扩展能力。RocketMQ Hook(钩子)机制类似于 Spring 的 AOP(面向切面编程)或 Servlet Filter。它允许开发者在消息发送前、发送后、消费前、消费后这四个关键时间点插入自定义逻辑。

Hook 的典型应用场景包括:

  • 全链路追踪:集成 SkyWalking 或 OpenTelemetry,在消息头中注入 TraceID。
  • 消息审计与监控:统计消息发送耗时、消费成功率。
  • 数据隔离与上下文透传:在多租户场景下,隐式传递租户身份信息。

本文将重点介绍 RocketMQ 的SendMessageHookConsumeMessageHook接口,并结合 Spring Boot,演示如何实现 SaaS 场景下的租户上下文透传。

2. 核心接口定义

2.1 SendMessageHook (发送方钩子)

该接口位于生产者端,用于拦截消息发送过程。

publicinterfaceSendMessageHook{StringhookName();// 发送消息之前执行// 可以在这里修改消息内容、添加 Header 属性、获取 ThreadLocal 上下文voidsendMessageBefore(SendMessageContextcontext);// 发送消息之后执行// 可以在这里统计耗时、记录发送结果voidsendMessageAfter(SendMessageContextcontext);}

2.2 ConsumeMessageHook (消费方钩子)

该接口位于消费者端,用于拦截消息消费过程。

publicinterfaceConsumeMessageHook{StringhookName();// 消费消息之前执行// 可以在这里从 Header 提取属性、设置 ThreadLocal 上下文voidconsumeMessageBefore(ConsumeMessageContextcontext);// 消费消息之后执行// 务必在这里清理 ThreadLocal,防止线程池污染voidconsumeMessageAfter(ConsumeMessageContextcontext);}

3. 实战案例:SaaS 多租户上下文透传

3.1 业务背景与问题

在 SaaS 架构中,所有租户共享同一套基础设施(应用服务、MQ 集群、数据库)。数据隔离通常依赖于代码中的tenant_id

  • 现状:HTTP 请求进入 Controller 时,拦截器会解析 Token 并将tenant_id存入当前线程的ThreadLocal(例如TenantContextHolder)。
  • 问题:当业务逻辑触发 MQ 消息发送时,消息会经过网络传输给消费者(可能是另一台服务器)。网络传输会导致 ThreadLocal 信息丢失。消费者线程在处理消息时,无法知道这条消息属于哪个租户,从而导致数据库操作缺少租户条件。
  • 解决方案:利用 RocketMQ Hook,在发送前将 ThreadLocal 中的tenant_id注入到消息的 UserProperty(Header)中;在消费前读取 Header 并还原到 ThreadLocal。

3.2 代码实现

(1) 发送方钩子:注入租户标识

实现SendMessageHook,将TenantContextHolder中的 ID 放入消息属性。

publicclassTenantRocketMQSendMessageHookimplementsSendMessageHook{publicstaticfinalStringHEADER_TENANT_ID="tenant-id";@OverridepublicStringhookName(){returngetClass().getSimpleName();}@OverridepublicvoidsendMessageBefore(SendMessageContextsendMessageContext){// 1. 获取当前线程的租户上下文LongtenantId=TenantContextHolder.getTenantId();// 2. 如果不存在租户信息,则跳过if(tenantId==null){return;}// 3. 将租户 ID 放入消息的 UserProperty (Header) 中// 这样该信息就会随网络传输到 BrokersendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID,tenantId.toString());}@OverridepublicvoidsendMessageAfter(SendMessageContextsendMessageContext){// 发送后无需特殊处理}}
(2) 消费方钩子:还原租户上下文

实现ConsumeMessageHook,从消息属性中提取 ID 并恢复到 ThreadLocal。

publicclassTenantRocketMQConsumeMessageHookimplementsConsumeMessageHook{publicstaticfinalStringHEADER_TENANT_ID="tenant-id";@OverridepublicStringhookName(){returngetClass().getSimpleName();}@OverridepublicvoidconsumeMessageBefore(ConsumeMessageContextcontext){List<MessageExt>messages=context.getMsgList();// 校验:多租户透传场景下,建议设置消费者为单条消费模式// 否则一批消息可能包含不同租户的数据,ThreadLocal 无法处理Assert.isTrue(messages.size()==1,"消息条数({})不正确,仅支持单条消费",messages.size());// 1. 从消息 Header 中读取租户 IDStringtenantIdStr=messages.get(0).getUserProperty(HEADER_TENANT_ID);// 2. 如果 Header 中存在租户信息,则恢复到当前消费者线程的 ThreadLocal 中if(StrUtil.isNotEmpty(tenantIdStr)){TenantContextHolder.setTenantId(Long.parseLong(tenantIdStr));}}@OverridepublicvoidconsumeMessageAfter(ConsumeMessageContextcontext){// 3. 【关键】消费结束后,必须清理 ThreadLocal// RocketMQ 消费者使用的是线程池,如果不清理,该线程复用时会导致租户数据污染TenantContextHolder.clear();}}
(3) 自动装配:注册 Hook 到 RocketMQ

在 Spring Boot 环境中,我们需要通过BeanPostProcessor拦截 RocketMQ 的 Bean 初始化过程,将上述两个 Hook 注册到底层的DefaultMQProducerDefaultMQPushConsumer对象中。

@ComponentpublicclassTenantRocketMQInitializerimplementsBeanPostProcessor{@OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{// 1. 拦截消费者容器,注册消费钩子if(beaninstanceofDefaultRocketMQListenerContainer){DefaultRocketMQListenerContainercontainer=(DefaultRocketMQListenerContainer)bean;initTenantConsumer(container.getConsumer());}// 2. 拦截生产者模板,注册发送钩子elseif(beaninstanceofRocketMQTemplate){RocketMQTemplatetemplate=(RocketMQTemplate)bean;initTenantProducer(template.getProducer());}returnbean;}privatevoidinitTenantProducer(DefaultMQProducerproducer){if(producer==null||producer.getDefaultMQProducerImpl()==null){return;}// 注册发送钩子producer.getDefaultMQProducerImpl().registerSendMessageHook(newTenantRocketMQSendMessageHook());}privatevoidinitTenantConsumer(DefaultMQPushConsumerconsumer){if(consumer==null||consumer.getDefaultMQPushConsumerImpl()==null){return;}// 注册消费钩子consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(newTenantRocketMQConsumeMessageHook());}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 10:57:26

Windows 11、10 电脑关机故障的原因找到了,微软已经开始准备修复补丁

2026 年 1 月对微软和 Windows 更新而言堪称一场灾难&#xff1a;公司几乎每周都会确认新漏洞的出现。其中一些问题确实非常严重&#xff0c;例如系统因“UNMOUNTABLE_BOOT_VOLUME”错误而无法启动&#xff1b;另一些则近乎荒诞&#xff0c;比如电脑无法正常关机或进入休眠状态…

作者头像 李华
网站建设 2026/4/16 8:50:17

YOLO26原创自研 | 自研独家创新BSAM注意力 ,基于CBAM升级

💡💡💡本文原创自研改进:提出新颖的注意力BSAM(BiLevel Spatial Attention Module),创新度极佳,适合科研创新,效果秒杀CBAM,Channel Attention+Spartial Attention升级为新颖的 BiLevel Attention+Spartial Attention 💡💡💡本文改进:1)作为注意力机制…

作者头像 李华
网站建设 2026/4/3 7:56:17

选型不盲目,部署更高效!企业智能预入职软件一体化解决方案

在企业人力资源管理数字化转型过程中&#xff0c;入职环节的效率与体验直接影响人才留存与企业口碑。一体化智能预入职软件通过整合信息采集、流程审批、员工引导等功能&#xff0c;成为优化入职管理的核心工具。但不少企业在选择与部署时&#xff0c;常面临功能匹配度低、流程…

作者头像 李华
网站建设 2026/4/16 12:33:07

fnOS 飞牛云 NAS 本地部署私人影视库 MoonTV 并实现外部访问

MoonTV 是一款影视聚合播放器。这款播放器集成了数十个免费站点资源&#xff0c;让你随时随地都能找到最新的热门电影、美剧、韩剧、动漫等等&#xff0c;更重要的是没有广告&#xff0c;不需要开通会员。本文将详细的介绍如何在 fnOS 飞牛云本地部署 MoonTV 并结合路由侠实现外…

作者头像 李华
网站建设 2026/4/15 19:29:20

【无人机辅助覆盖】搭载无人机的空中基站在19个六边形蜂窝网络的部署方案动态优化,实现信干噪比、吞吐量、用户提升蜂窝网络性能附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;擅长毕业设计辅导、数学建模、数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。&#x1f34e; 往期回顾关注个人主页&#xff1a;Matlab科研工作室&#x1f447; 关注我领取海量matlab电子书和…

作者头像 李华