news 2026/5/3 20:46:29

Kafka 事务机制 跨分区 + 跨会话 通俗讲解 + 可运行代码示例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka 事务机制 跨分区 + 跨会话 通俗讲解 + 可运行代码示例

Kafka 事务机制 跨分区 + 跨会话 通俗讲解 + 可运行代码示例

一、先白话定义

1. 跨分区写入

一次业务,需要往多个分区 / 多个主题发送多条消息。

事务保证:多条消息要么全部提交消费者可见,要么全部回滚一条都看不见,不会中间成功一半。

2. 跨会话

生产者中途宕机、重启、换实例,新的生产者会话还能接手之前未完成的事务,继续提交 / 回滚,保证原子性不被破坏。

3. 解决的问题

不用事务:可能订单消息发成功,库存消息发失败 → 数据不一致。

用事务:同批次多条消息同生共死


二、核心前置配置

Kafka 事务必须配置:

  1. 开启幂等生产者
  2. 指定transactional.id
  3. 配置事务超时时间

三、代码示例:跨分区 / 跨主题 事务原子写入

场景:

下单成功,要同时发两条消息:

  • 订单主题:order_topic
  • 库存主题:stock_topic

要求:两条消息要么都成功,要么都撤回

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaTransactionDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 1. 开启幂等生产者(事务依赖) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 2. 设置事务唯一ID props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id-001"); // 3. 事务超时时间 props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); Producer<String, String> producer = new KafkaProducer<>(props); try { // 初始化事务 producer.initTransactions(); // 开启事务 producer.beginTransaction(); // ========== 跨主题/跨分区 发送多条消息 ========== // 消息1:订单主题 ProducerRecord<String, String> orderMsg = new ProducerRecord<>("order_topic", "order_001", "创建订单成功"); producer.send(orderMsg); // 消息2:库存主题 ProducerRecord<String, String> stockMsg = new ProducerRecord<>("stock_topic", "stock_001", "扣减商品库存"); producer.send(stockMsg); // 模拟中间业务异常,测试回滚 // int i = 1 / 0; // 正常:提交事务,两条消息同时对消费者可见 producer.commitTransaction(); } catch (Exception e) { // 异常:回滚事务,两条消息全部不可见 producer.abortTransaction(); e.printStackTrace(); } finally { producer.close(); } } }

四、代码逻辑解释

  1. initTransactions()初始化事务环境
  2. beginTransaction()开启事务
  3. 连续往 ** 不同主题(本质就是跨分区)** 发两条消息
  4. 无异常:commitTransaction()→ 两条消息同时生效,消费者都能消费
  5. 有异常:abortTransaction()→ 两条消息全部作废,消费者一条都看不到

完美解决:一部分成功、一部分失败的数据不一致问题。


五、跨会话 是什么?代码层面理解

  • 上面代码transactional.id="transaction-id-001"全局唯一
  • 如果生产者运行中宕机、重启,新生产者配置同一个 transactional.id
  • Kafka 会识别这是同一个事务会话,可以查询上一次事务状态,继续提交或回滚

👉 这就叫跨会话事务恢复

生产者换了进程、重启了,事务还能接着收尾,不会卡住、不会脏数据。


六、面试背诵版(结合代码总结)

Kafka 事务机制支持跨分区、跨主题原子写入,也支持跨生产者会话事务恢复

代码层面通过配置transactional.id、开启幂等生产者,再通过beginTransaction开启事务,批量向多个主题 / 分区发送消息;正常执行就commitTransaction批量生效,出现异常就abortTransaction全部回滚,保证多条消息要么全部成功、要么全部失败,解决多消息写入的数据一致性问题;生产者宕机重启后,通过相同事务 ID 可跨会话承接上一次事务状态,保证原子性不被破坏。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/3 20:39:35

5分钟快速解密微信聊天记录:免费开源工具完整指南

5分钟快速解密微信聊天记录&#xff1a;免费开源工具完整指南 【免费下载链接】WechatDecrypt 微信消息解密工具 项目地址: https://gitcode.com/gh_mirrors/we/WechatDecrypt 你是否曾因手机损坏或误删而丢失珍贵的微信聊天记录&#xff1f;那些与家人朋友的温馨对话、…

作者头像 李华
网站建设 2026/5/3 20:38:24

VMware macOS 解锁终极指南:5分钟快速上手Auto-Unlocker

VMware macOS 解锁终极指南&#xff1a;5分钟快速上手Auto-Unlocker 【免费下载链接】auto-unlocker Unlocker for VMWare macOS 项目地址: https://gitcode.com/gh_mirrors/au/auto-unlocker 想在VMware虚拟机中运行macOS系统吗&#xff1f;Auto-Unlocker就是你一直在寻…

作者头像 李华
网站建设 2026/5/3 20:37:27

APK Installer终极指南:Windows平台安卓应用安装的完整解决方案

APK Installer终极指南&#xff1a;Windows平台安卓应用安装的完整解决方案 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer APK Installer是一款专为Windows平台设计的…

作者头像 李华
网站建设 2026/5/3 20:37:25

雀魂牌谱屋完全攻略:如何用数据分析在3个月内提升2个段位

雀魂牌谱屋完全攻略&#xff1a;如何用数据分析在3个月内提升2个段位 【免费下载链接】amae-koromo 雀魂牌谱屋 (See also: https://github.com/SAPikachu/amae-koromo-scripts ) 项目地址: https://gitcode.com/gh_mirrors/am/amae-koromo 还在为雀魂麻将的段位停滞不前…

作者头像 李华