Kafka 事务机制 跨分区 + 跨会话 通俗讲解 + 可运行代码示例
一、先白话定义
1. 跨分区写入
一次业务,需要往多个分区 / 多个主题发送多条消息。
事务保证:多条消息要么全部提交消费者可见,要么全部回滚一条都看不见,不会中间成功一半。
2. 跨会话
生产者中途宕机、重启、换实例,新的生产者会话还能接手之前未完成的事务,继续提交 / 回滚,保证原子性不被破坏。
3. 解决的问题
不用事务:可能订单消息发成功,库存消息发失败 → 数据不一致。
用事务:同批次多条消息同生共死。
二、核心前置配置
Kafka 事务必须配置:
- 开启幂等生产者
- 指定
transactional.id - 配置事务超时时间
三、代码示例:跨分区 / 跨主题 事务原子写入
场景:
下单成功,要同时发两条消息:
- 订单主题:order_topic
- 库存主题:stock_topic
要求:两条消息要么都成功,要么都撤回。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaTransactionDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 1. 开启幂等生产者(事务依赖) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 2. 设置事务唯一ID props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id-001"); // 3. 事务超时时间 props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); Producer<String, String> producer = new KafkaProducer<>(props); try { // 初始化事务 producer.initTransactions(); // 开启事务 producer.beginTransaction(); // ========== 跨主题/跨分区 发送多条消息 ========== // 消息1:订单主题 ProducerRecord<String, String> orderMsg = new ProducerRecord<>("order_topic", "order_001", "创建订单成功"); producer.send(orderMsg); // 消息2:库存主题 ProducerRecord<String, String> stockMsg = new ProducerRecord<>("stock_topic", "stock_001", "扣减商品库存"); producer.send(stockMsg); // 模拟中间业务异常,测试回滚 // int i = 1 / 0; // 正常:提交事务,两条消息同时对消费者可见 producer.commitTransaction(); } catch (Exception e) { // 异常:回滚事务,两条消息全部不可见 producer.abortTransaction(); e.printStackTrace(); } finally { producer.close(); } } }四、代码逻辑解释
initTransactions()初始化事务环境beginTransaction()开启事务- 连续往 ** 不同主题(本质就是跨分区)** 发两条消息
- 无异常:
commitTransaction()→ 两条消息同时生效,消费者都能消费 - 有异常:
abortTransaction()→ 两条消息全部作废,消费者一条都看不到
完美解决:一部分成功、一部分失败的数据不一致问题。
五、跨会话 是什么?代码层面理解
- 上面代码
transactional.id="transaction-id-001"是全局唯一的 - 如果生产者运行中宕机、重启,新生产者配置同一个 transactional.id
- Kafka 会识别这是同一个事务会话,可以查询上一次事务状态,继续提交或回滚
👉 这就叫跨会话事务恢复:
生产者换了进程、重启了,事务还能接着收尾,不会卡住、不会脏数据。
六、面试背诵版(结合代码总结)
Kafka 事务机制支持跨分区、跨主题原子写入,也支持跨生产者会话事务恢复。
代码层面通过配置transactional.id、开启幂等生产者,再通过beginTransaction开启事务,批量向多个主题 / 分区发送消息;正常执行就commitTransaction批量生效,出现异常就abortTransaction全部回滚,保证多条消息要么全部成功、要么全部失败,解决多消息写入的数据一致性问题;生产者宕机重启后,通过相同事务 ID 可跨会话承接上一次事务状态,保证原子性不被破坏。