一、事务消息核心原理
1.1 事务消息解决的问题
在分布式系统中,保证本地事务与消息发送的原子性。
1.2 二阶段提交流程
java
复制
下载
// 事务消息的完整流程 ┌─────────────────┐ 1.发送半消息 ┌─────────────────┐ │ ├───────────────────►│ │ │ 生产者 │ │ RocketMQ │ │ (Producer) │◄───────────────────┤ Broker │ │ │ 2.半消息发送成功 │ │ └─────────┬───────┘ └─────────┬───────┘ │ │ │ 3.执行本地事务 │ ▼ │ ┌─────────────────┐ │ │ 本地事务执行结果 │ │ │ (Commit/Rollback)│ │ └─────────┬───────┘ │ │ │ │ 4.提交或回滚事务状态 │ └──────────────────────────────────────►┌─────────────────┐ │ 事务状态回查 │ │ (如果超时未确认) │ └─────────────────┘
二、核心代码实现
2.1 生产者端实现
java
复制
下载
// 事务消息生产者 public class TransactionProducer { private final TransactionMQProducer producer; private final TransactionListener transactionListener; public TransactionProducer() { // 1. 创建事务消息生产者 producer = new TransactionMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("localhost:9876"); // 2. 设置事务监听器(核心组件) transactionListener = new LocalTransactionListenerImpl(); producer.setTransactionListener(transactionListener); // 3. 设置事务回查线程池 ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> new Thread(r, "Transaction-Check-Thread") ); producer.setExecutorService(executorService); // 4. 启动生产者 producer.start(); } // 发送事务消息 public SendResult sendTransactionMessage(String topic, String tags, Object businessData) throws Exception { // 1. 构建消息 Message msg = new Message(topic, tags, JSON.toJSONBytes(businessData)); // 2. 设置事务ID(用于关联本地事务) String transactionId = UUID.randomUUID().toString(); msg.putUserProperty("TRANSACTION_ID", transactionId); // 3. 发送半消息(第一阶段) SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("半消息发送结果: " + sendResult.getSendStatus()); return sendResult; } } // 事务监听器实现(核心) public class LocalTransactionListenerImpl implements TransactionListener { // 本地事务执行状态存储 private final Map<String, LocalTransactionState> transactionStateMap = new ConcurrentHashMap<>(); private final TransactionService transactionService; public LocalTransactionListenerImpl() { this.transactionService = new TransactionService(); } /** * 第一阶段:执行本地事务 * 当半消息发送成功后,Broker会回调此方法 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transactionId = msg.getUserProperty("TRANSACTION_ID"); System.out.println("执行本地事务,事务ID: " + transactionId); try { // 1. 执行本地业务逻辑(如数据库操作) boolean success = transactionService.executeBusiness(msg); if (success) { // 本地事务成功,提交消息 transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE); System.out.println("本地事务执行成功,准备提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 本地事务失败,回滚消息 transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE); System.out.println("本地事务执行失败,准备回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 执行异常,标记为未知状态(等待回查) transactionStateMap.put(transactionId, LocalTransactionState.UNKNOW); System.out.println("本地事务执行异常,标记为未知状态: " + e.getMessage()); return LocalTransactionState.UNKNOW; } } /** * 第二阶段:事务状态回查 * 如果第一阶段返回UNKNOW,或者Broker未收到确认,会触发回查 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId = msg.getUserProperty("TRANSACTION_ID"); System.out.println("事务状态回查,事务ID: " + transactionId); // 1. 从存储中查询事务状态 LocalTransactionState cachedState = transactionStateMap.get(transactionId); if (cachedState != null && cachedState != LocalTransactionState.UNKNOW) { // 状态已明确,直接返回 return cachedState; } // 2. 主动查询本地事务状态 try { boolean isCommitted = transactionService .checkTransactionStatus(transactionId); if (isCommitted) { transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE); return LocalTransactionState.COMMIT_MESSAGE; } else { transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 查询失败,继续等待下次回查 System.out.println("事务状态回查异常: " + e.getMessage()); return LocalTransactionState.UNKNOW; } } } // 本地事务服务(模拟) public class TransactionService { private final Map<String, Boolean> transactionStatus = new ConcurrentHashMap<>(); // 执行业务逻辑 public boolean executeBusiness(Message msg) { String transactionId = msg.getUserProperty("TRANSACTION_ID"); try { // 模拟数据库操作(这里是关键) System.out.println("开始执行业务逻辑..."); // 1. 业务处理 processBusiness(msg); // 2. 数据库事务提交 commitDatabaseTransaction(transactionId); // 3. 记录事务状态 transactionStatus.put(transactionId, true); System.out.println("业务逻辑执行成功"); return true; } catch (Exception e) { // 发生异常,回滚数据库事务 rollbackDatabaseTransaction(transactionId); transactionStatus.put(transactionId, false); System.out.println("业务逻辑执行失败: " + e.getMessage()); return false; } } // 查询事务状态 public boolean checkTransactionStatus(String transactionId) { // 1. 先查缓存 Boolean status = transactionStatus.get(transactionId); if (status != null) { return status; } // 2. 查询数据库(实际业务中) return queryDatabaseTransactionStatus(transactionId); } private void processBusiness(Message msg) { // 实际的业务处理逻辑 // 例如:订单创建、库存扣减等 } private void commitDatabaseTransaction(String transactionId) { // 数据库事务提交 System.out.println("提交数据库事务: " + transactionId); } private void rollbackDatabaseTransaction(String transactionId) { // 数据库事务回滚 System.out.println("回滚数据库事务: " + transactionId); } private boolean queryDatabaseTransactionStatus(String transactionId) { // 查询数据库中的事务状态 System.out.println("查询数据库事务状态: " + transactionId); return true; // 假设查询成功 } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
2.2 Broker端实现(核心逻辑)
java
复制
下载
// Broker端的事务消息处理器(简化版) public class TransactionalMessageProcessor { // 半消息存储(OP队列) private final MessageStore halfMessageStore; // 事务状态存储 private final TransactionStateTable stateTable; // 事务状态回查服务 private final TransactionCheckService checkService; /** * 第一阶段:接收半消息 */ public PutMessageResult putHalfMessage(MessageExtBrokerInner message) { // 1. 标记为半消息 message.setTransactionPrepared(true); // 2. 获取事务ID String transactionId = message.getProperty("TRANSACTION_ID"); // 3. 存储到特殊的半消息队列(OP队列) PutMessageResult result = halfMessageStore.putMessage(message); if (result.isOk()) { // 4. 记录事务状态(初始为PREPARED) stateTable.addTransactionState(transactionId, TransactionState.PREPARED, System.currentTimeMillis()); // 5. 开启回查定时任务(如果生产者未及时确认) scheduleCheck(transactionId, message); } return result; } /** * 第二阶段:处理事务确认 */ public void processTransactionResponse(String transactionId, TransactionState state) { // 1. 更新事务状态 stateTable.updateTransactionState(transactionId, state); // 2. 根据状态处理消息 switch (state) { case COMMIT: // 提交:将消息从OP队列移动到真正的Topic commitMessage(transactionId); break; case ROLLBACK: // 回滚:删除OP队列中的消息 rollbackMessage(transactionId); break; default: // 保持PREPARED状态 break; } } // 提交消息 private void commitMessage(String transactionId) { // 1. 从OP队列获取半消息 MessageExt halfMessage = halfMessageStore.getMessage(transactionId); if (halfMessage != null) { // 2. 移除半消息标记 halfMessage.setTransactionPrepared(false); // 3. 存储到真正的Topic队列 messageStore.putMessage(halfMessage); // 4. 从OP队列删除 halfMessageStore.deleteMessage(transactionId); System.out.println("事务消息提交成功: " + transactionId); } } // 回滚消息 private void rollbackMessage(String transactionId) { // 直接从OP队列删除 halfMessageStore.deleteMessage(transactionId); System.out.println("事务消息回滚: " + transactionId); } // 事务状态回查 private void scheduleCheck(String transactionId, MessageExt message) { // 设置回查延时(默认1分钟) long checkDelay = message.getCheckImmunityTime(); if (checkDelay <= 0) { checkDelay = 60000; // 默认60秒 } Timer timer = new Timer("Transaction-Check-Timer"); timer.schedule(new TimerTask() { @Override public void run() { // 检查事务状态 TransactionState currentState = stateTable.getTransactionState(transactionId); if (currentState == TransactionState.PREPARED) { // 状态还是PREPARED,触发回查 triggerCheckBack(transactionId, message); } } }, checkDelay); } // 触发回查 private void triggerCheckBack(String transactionId, MessageExt message) { // 1. 构建回查请求 CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(message.getCommitLogOffset()); // 2. 发送回查请求给生产者 checkService.sendCheckRequest(transactionId, requestHeader); } } // 事务状态枚举 public enum TransactionState { PREPARED, // 已准备(半消息) COMMIT, // 已提交 ROLLBACK, // 已回滚 UNKNOWN // 未知 } // 事务状态表 public class TransactionStateTable { private final Map<String, TransactionStateEntry> stateMap = new ConcurrentHashMap<>(); // 事务状态条目 static class TransactionStateEntry { private TransactionState state; private long prepareTime; private long commitTime; private int checkTimes; // 回查次数 public TransactionStateEntry(TransactionState state, long prepareTime) { this.state = state; this.prepareTime = prepareTime; this.checkTimes = 0; } } public void addTransactionState(String transactionId, TransactionState state, long timestamp) { stateMap.put(transactionId, new TransactionStateEntry(state, timestamp)); } public void updateTransactionState(String transactionId, TransactionState newState) { TransactionStateEntry entry = stateMap.get(transactionId); if (entry != null) { entry.state = newState; if (newState == TransactionState.COMMIT) { entry.commitTime = System.currentTimeMillis(); } } } public TransactionState getTransactionState(String transactionId) { TransactionStateEntry entry = stateMap.get(transactionId); return entry != null ? entry.state : null; } }三、完整示例:电商订单创建
3.1 电商订单事务消息示例
java
复制
下载
// 订单服务 public class OrderService { private final TransactionProducer transactionProducer; private final OrderDao orderDao; public OrderService() { this.transactionProducer = new TransactionProducer(); this.orderDao = new OrderDao(); } // 创建订单(使用事务消息) public boolean createOrder(OrderDTO orderDTO) { // 1. 生成订单ID String orderId = generateOrderId(); orderDTO.setOrderId(orderId); // 2. 发送事务消息 try { // 构建消息体 OrderMessage orderMessage = new OrderMessage(); orderMessage.setOrderId(orderId); orderMessage.setUserId(orderDTO.getUserId()); orderMessage.setAmount(orderDTO.getAmount()); orderMessage.setItems(orderDTO.getItems()); // 发送事务消息 SendResult result = transactionProducer.sendTransactionMessage( "ORDER_CREATE_TOPIC", "CREATE", orderMessage ); System.out.println("订单创建事务消息发送成功: " + result.getMsgId()); return true; } catch (Exception e) { System.out.println("订单创建失败: " + e.getMessage()); return false; } } } // 订单事务监听器 public class OrderTransactionListener implements TransactionListener { private final OrderDao orderDao; private final InventoryService inventoryService; private final CouponService couponService; // 存储本地事务执行状态 private final Map<String, OrderTransaction> transactionMap = new ConcurrentHashMap<>(); public OrderTransactionListener() { this.orderDao = new OrderDao(); this.inventoryService = new InventoryService(); this.couponService = new CouponService(); } @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transactionId = msg.getProperty("TRANSACTION_ID"); String orderId = msg.getProperty("ORDER_ID"); System.out.println("开始执行订单本地事务,订单ID: " + orderId); try { // 解析消息 OrderMessage orderMessage = JSON.parseObject( msg.getBody(), OrderMessage.class); // 开始数据库事务 Connection conn = DatabaseUtil.getConnection(); conn.setAutoCommit(false); try { // 1. 创建订单记录 orderDao.createOrder(conn, orderMessage); // 2. 扣减库存 for (OrderItem item : orderMessage.getItems()) { inventoryService.deductInventory( conn, item.getProductId(), item.getQuantity()); } // 3. 使用优惠券 if (orderMessage.getCouponId() != null) { couponService.useCoupon( conn, orderMessage.getUserId(), orderMessage.getCouponId() ); } // 4. 更新用户积分 updateUserPoints(conn, orderMessage); // 5. 提交数据库事务 conn.commit(); // 记录事务状态 transactionMap.put(transactionId, new OrderTransaction(orderId, true, "SUCCESS")); System.out.println("订单本地事务执行成功,准备提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { // 回滚数据库事务 conn.rollback(); // 记录失败状态 transactionMap.put(transactionId, new OrderTransaction(orderId, false, e.getMessage())); System.out.println("订单本地事务执行失败,准备回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } finally { conn.close(); } } catch (Exception e) { // 记录异常状态 transactionMap.put(transactionId, new OrderTransaction(orderId, false, "EXCEPTION")); System.out.println("订单本地事务执行异常,标记为未知状态"); return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId = msg.getProperty("TRANSACTION_ID"); String orderId = msg.getProperty("ORDER_ID"); System.out.println("订单事务状态回查,订单ID: " + orderId); // 1. 先查缓存 OrderTransaction cached = transactionMap.get(transactionId); if (cached != null) { if (cached.isSuccess()) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } // 2. 查询数据库订单状态 try { OrderStatus status = orderDao.queryOrderStatus(orderId); if (status == OrderStatus.CREATED) { // 订单创建成功 transactionMap.put(transactionId, new OrderTransaction(orderId, true, "FOUND_IN_DB")); return LocalTransactionState.COMMIT_MESSAGE; } else if (status == OrderStatus.FAILED) { // 订单创建失败 transactionMap.put(transactionId, new OrderTransaction(orderId, false, "FOUND_IN_DB")); return LocalTransactionState.ROLLBACK_MESSAGE; } else { // 订单不存在或状态未知 return LocalTransactionState.UNKNOW; } } catch (Exception e) { System.out.println("订单状态查询失败: " + e.getMessage()); return LocalTransactionState.UNKNOW; } } private void updateUserPoints(Connection conn, OrderMessage orderMessage) throws SQLException { // 更新用户积分逻辑 int points = calculatePoints(orderMessage.getAmount()); orderDao.updateUserPoints(conn, orderMessage.getUserId(), points); } private int calculatePoints(BigDecimal amount) { // 积分计算规则 return amount.divide(new BigDecimal("10"), 0, RoundingMode.DOWN) .intValue(); } } // 订单事务状态记录 class OrderTransaction { private String orderId; private boolean success; private String message; private long timestamp; public OrderTransaction(String orderId, boolean success, String message) { this.orderId = orderId; this.success = success; this.message = message; this.timestamp = System.currentTimeMillis(); } // getters and setters }3.2 消费者端实现
java
复制
下载
// 订单消息消费者 public class OrderConsumer { private final DefaultMQPushConsumer consumer; public OrderConsumer() { consumer = new DefaultMQPushConsumer("OrderConsumerGroup"); consumer.setNamesrvAddr("localhost:9876"); // 订阅订单创建成功的消息 consumer.subscribe("ORDER_CREATE_TOPIC", "*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 解析消息 OrderMessage orderMessage = JSON.parseObject( msg.getBody(), OrderMessage.class); System.out.println("收到订单创建成功消息: " + orderMessage.getOrderId()); // 执行业务逻辑(如发送通知、更新缓存等) processOrderCreated(orderMessage); } catch (Exception e) { System.out.println("处理订单消息失败: " + e.getMessage()); // 消费失败,稍后重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("订单消费者启动成功"); } private void processOrderCreated(OrderMessage orderMessage) { // 1. 发送订单创建成功通知 sendNotification(orderMessage); // 2. 更新订单缓存 updateOrderCache(orderMessage); // 3. 触发后续业务流程 triggerNextStep(orderMessage); } private void sendNotification(OrderMessage orderMessage) { // 发送邮件、短信通知 System.out.println("发送订单创建成功通知给用户: " + orderMessage.getUserId()); } private void updateOrderCache(OrderMessage orderMessage) { // 更新Redis缓存 String key = "order:" + orderMessage.getOrderId(); // redisTemplate.opsForValue().set(key, orderMessage); System.out.println("更新订单缓存: " + key); } private void triggerNextStep(OrderMessage orderMessage) { // 触发物流、支付等后续流程 System.out.println("触发订单后续流程: " + orderMessage.getOrderId()); } }四、高级特性与优化
4.1 事务消息的幂等性处理
java
复制
下载
// 幂等性处理器 public class IdempotentProcessor { private final Cache<String, Long> processedMessages; public IdempotentProcessor() { // 使用Guava Cache,自动过期 processedMessages = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); } /** * 检查消息是否已处理 */ public boolean isProcessed(String messageKey) { return processedMessages.getIfPresent(messageKey) != null; } /** * 标记消息已处理 */ public void markProcessed(String messageKey) { processedMessages.put(messageKey, System.currentTimeMillis()); } /** * 幂等性消费 */ public ConsumeConcurrentlyStatus consumeWithIdempotent( MessageExt msg, ConsumerFunction function) { // 生成消息唯一标识 String messageKey = generateMessageKey(msg); // 检查是否已处理 if (isProcessed(messageKey)) { System.out.println("消息已处理,跳过: " + messageKey); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } try { // 执行业务逻辑 function.process(msg); // 标记已处理 markProcessed(messageKey); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费失败: " + e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } private String generateMessageKey(MessageExt msg) { // 使用消息ID + 业务唯一标识 String orderId = msg.getProperty("ORDER_ID"); if (orderId != null) { return msg.getMsgId() + "_" + orderId; } return msg.getMsgId(); } @FunctionalInterface public interface ConsumerFunction { void process(MessageExt msg) throws Exception; } }4.2 事务消息的重试机制
java
复制
下载
// 事务消息重试管理器 public class TransactionRetryManager { private final ScheduledExecutorService scheduler; private final Map<String, RetryTask> retryTasks; public TransactionRetryManager() { this.scheduler = Executors.newScheduledThreadPool(5); this.retryTasks = new ConcurrentHashMap<>(); } /** * 注册重试任务 */ public void registerRetry(String transactionId, RetryStrategy strategy, RetryCallback callback) { RetryTask task = new RetryTask(transactionId, strategy, callback); retryTasks.put(transactionId, task); // 第一次重试延迟 long delay = strategy.getDelay(0); scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); } /** * 重试任务 */ class RetryTask implements Runnable { private final String transactionId; private final RetryStrategy strategy; private final RetryCallback callback; private int retryCount = 0; public RetryTask(String transactionId, RetryStrategy strategy, RetryCallback callback) { this.transactionId = transactionId; this.strategy = strategy; this.callback = callback; } @Override public void run() { try { boolean success = callback.retry(); if (success) { // 重试成功,移除任务 retryTasks.remove(transactionId); } else if (retryCount < strategy.getMaxRetries()) { // 重试失败,继续重试 retryCount++; long delay = strategy.getDelay(retryCount); scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); } else { // 达到最大重试次数,放弃 System.out.println("事务重试失败,达到最大重试次数: " + transactionId); retryTasks.remove(transactionId); } } catch (Exception e) { System.out.println("事务重试异常: " + e.getMessage()); } } } // 重试策略 static class RetryStrategy { private int maxRetries = 3; private long initialDelay = 1000; // 1秒 private long maxDelay = 60000; // 60秒 private double multiplier = 2.0; // 指数退避 public long getDelay(int retryCount) { long delay = (long) (initialDelay * Math.pow(multiplier, retryCount)); return Math.min(delay, maxDelay); } // getters and setters } @FunctionalInterface public interface RetryCallback { boolean retry() throws Exception; } }4.3 事务消息的监控与告警
java
复制
下载
// 事务消息监控 @RestController public class TransactionMonitorController { @Autowired private TransactionStatsCollector statsCollector; @GetMapping("/api/transaction/stats") public TransactionStats getTransactionStats() { return statsCollector.collectStats(); } @GetMapping("/api/transaction/{id}/status") public TransactionStatus getTransactionStatus(@PathVariable String id) { return statsCollector.getTransactionStatus(id); } @GetMapping("/api/transaction/alerts") public List<TransactionAlert> getActiveAlerts() { return statsCollector.getActiveAlerts(); } } // 事务统计收集器 @Component public class TransactionStatsCollector { // 事务计数器 private final AtomicLong totalTransactions = new AtomicLong(0); private final AtomicLong successTransactions = new AtomicLong(0); private final AtomicLong failedTransactions = new AtomicLong(0); private final AtomicLong pendingTransactions = new AtomicLong(0); // 耗时统计 private final Histogram transactionDuration; public TransactionStatsCollector() { this.transactionDuration = new Histogram( TimeUnit.SECONDS.toMillis(1), TimeUnit.MINUTES.toMillis(5), 3 ); } public void recordTransaction(String transactionId, boolean success, long duration) { totalTransactions.incrementAndGet(); if (success) { successTransactions.incrementAndGet(); } else { failedTransactions.incrementAndGet(); } transactionDuration.record(duration); // 记录到日志 logTransaction(transactionId, success, duration); } public TransactionStats collectStats() { TransactionStats stats = new TransactionStats(); stats.setTotalTransactions(totalTransactions.get()); stats.setSuccessTransactions(successTransactions.get()); stats.setFailedTransactions(failedTransactions.get()); stats.setPendingTransactions(pendingTransactions.get()); stats.setSuccessRate(calculateSuccessRate()); stats.setAverageDuration(transactionDuration.getMean()); stats.setP95Duration(transactionDuration.getValue(0.95)); stats.setP99Duration(transactionDuration.getValue(0.99)); return stats; } private double calculateSuccessRate() { long total = totalTransactions.get(); if (total == 0) { return 0.0; } return (double) successTransactions.get() / total * 100; } private void logTransaction(String transactionId, boolean success, long duration) { // 记录到日志系统 String log = String.format( "Transaction: id=%s, success=%s, duration=%dms", transactionId, success, duration ); System.out.println(log); } } // 自定义Histogram实现(简化) class Histogram { private final long[] buckets; private final long[] counts; private long totalCount; private double sum; public Histogram(long min, long max, int bucketCount) { this.buckets = new long[bucketCount + 1]; this.counts = new long[bucketCount]; long step = (max - min) / bucketCount; for (int i = 0; i <= bucketCount; i++) { buckets[i] = min + i * step; } } public void record(long value) { totalCount++; sum += value; for (int i = 0; i < buckets.length - 1; i++) { if (value >= buckets[i] && value < buckets[i + 1]) { counts[i]++; break; } } } public double getMean() { return totalCount > 0 ? sum / totalCount : 0; } public long getValue(double percentile) { if (totalCount == 0) { return 0; } long target = (long) (totalCount * percentile); long accumulated = 0; for (int i = 0; i < counts.length; i++) { accumulated += counts[i]; if (accumulated >= target) { return buckets[i + 1]; } } return buckets[buckets.length - 1]; } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc
需要全套面试笔记及答案
【点击此处即可/免费获取】
五、常见问题与解决方案
5.1 事务消息的最终一致性
java
复制
下载
// 最终一致性保障 public class EventualConsistencyGuarantee { /** * 方法1:消息重试 + 死信队列 */ public void guaranteeByRetryAndDLQ() { // 1. 正常消费 // 2. 消费失败 -> 重试N次 // 3. 仍然失败 -> 进入死信队列 // 4. 人工介入处理死信 } /** * 方法2:本地消息表 */ public void guaranteeByLocalMessageTable() { // 1. 业务操作 + 消息记录存入本地数据库(同一事务) // 2. 定时任务扫描未发送消息 // 3. 发送到MQ // 4. 更新发送状态 } /** * 方法3:最大努力通知 */ public void guaranteeByBestEffort() { // 1. 业务操作完成 // 2. 发送通知消息(允许失败) // 3. 定时重试通知 // 4. 达到最大重试次数后记录日志 } }5.2 事务消息性能优化
java
复制
下载
// 事务消息性能优化 public class TransactionPerformanceOptimizer { // 批量发送半消息 public List<SendResult> sendHalfMessagesInBatch( List<Message> messages) throws Exception { // 1. 批量发送半消息 SendResult batchResult = producer.send(messages); // 2. 批量执行本地事务 List<LocalTransactionState> states = executeLocalTransactionsInBatch(messages); // 3. 批量提交事务状态 batchEndTransaction(batchResult, states); // 4. 返回结果 return extractSendResults(batchResult); } // 异步执行本地事务 public CompletableFuture<LocalTransactionState> executeLocalTransactionAsync(Message msg) { return CompletableFuture.supplyAsync(() -> { try { return transactionListener.executeLocalTransaction(msg, null); } catch (Exception e) { return LocalTransactionState.UNKNOW; } }); } // 状态缓存优化 public class TransactionStateCache { private final Cache<String, LocalTransactionState> cache; public TransactionStateCache() { this.cache = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .recordStats() // 记录统计信息 .build(); } public LocalTransactionState get(String transactionId) { return cache.getIfPresent(transactionId); } public void put(String transactionId, LocalTransactionState state) { cache.put(transactionId, state); } // 获取缓存命中率 public double getHitRate() { CacheStats stats = cache.stats(); return stats.hitRate(); } } }六、总结
6.1 事务消息的核心优势
强一致性:保证本地事务和消息发送的原子性
最终一致性:跨系统数据一致性
解耦:业务系统与消息系统解耦
可靠性:消息不丢失,支持重试
6.2 最佳实践
合理设置回查时间:根据业务耗时设置
保证本地事务幂等性:防止重复执行
监控告警:及时发现处理异常
性能优化:批量、异步、缓存
6.3 注意事项
事务消息会增加系统复杂性
需要处理消息积压问题
考虑网络分区时的容错
做好数据一致性的验证
RocketMQ的事务消息二阶段提交实现,通过半消息、本地事务执行、事务状态回查等机制,实现了分布式系统中的数据一致性保障,是处理分布式事务的有效方案。