news 2026/4/16 12:31:13

缓存与数据库一致性解决方案深度解析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
缓存与数据库一致性解决方案深度解析

一、业务场景与挑战

1.1 12306余票查询场景

在12306系统中,用户需要实时查询列车不同站点、不同座位类型的余票信息。为提升查询性能,我们将余票信息缓存在Redis中。但在用户下单支付时,需要同时更新数据库和缓存中的余票数据。

核心挑战

  • 高并发场景下的数据一致性

  • 扣减操作的原子性保证

  • 系统的高可用性要求

1.2 数据一致性问题分析

问题类型描述影响程度
缓存穿透查询不存在的数据,绕过缓存直接访问数据库
缓存击穿热点key过期瞬间,大量请求直达数据库
缓存雪崩大量缓存同时过期,数据库压力激增极高
数据不一致缓存与数据库数据不一致中高

二、一致性方案深度对比

2.1 方案对比分析

方案一致性复杂度性能适用场景
先写缓存再写DB对一致性要求不高的读多写少场景
先写DB再写缓存读多写少,可容忍短暂不一致
先删缓存再写DB写多读少场景
缓存双删最终一致对一致性要求较高,有MQ中间件
先写DB再删缓存最终一致大部分业务场景,推荐使用
Binlog异步更新最终一致大型系统,多数据源同步

2.2 各种方案的缺陷分析

方案1:先写缓存再写数据库 ❌

java

// 问题示例代码 public boolean updateStock(String trainId, int seats) { // 1. 先更新缓存 redisTemplate.opsForValue().set(cacheKey, seats); // 2. 再更新数据库(可能出现失败) int result = stockMapper.updateStock(trainId, seats); return result > 0; }

问题:缓存更新成功但数据库更新失败,导致数据永久不一致。

方案2:先写数据库再写缓存 ❌

java

public boolean updateStock(String trainId, int seats) { // 1. 先更新数据库 int result = stockMapper.updateStock(trainId, seats); if (result <= 0) return false; // 2. 并发问题:此时另一个线程可能已经更新了数据库但还没写缓存 redisTemplate.opsForValue().set(cacheKey, seats); return true; }
方案3:先删缓存再写数据库 ❌

java

public boolean updateStock(String trainId, int seats) { // 1. 删除缓存 redisTemplate.delete(cacheKey); // 2. 更新数据库 int result = stockMapper.updateStock(trainId, seats); // 3. 问题:在1和2之间,读请求可能读取旧数据并回填缓存 return result > 0; }

三、推荐方案深度解析

3.1 方案5:先写数据库再删除缓存(推荐)

3.1.1 核心原理

text

写操作流程: 1. 开启事务 2. 更新数据库 3. 提交事务 4. 删除缓存 读操作流程: 1. 查询缓存,命中则返回 2. 未命中则查询数据库 3. 将数据写入缓存
3.1.2 实现代码

java

@Service @Slf4j public class TicketStockService { @Autowired private RedisTemplate<String, Integer> redisTemplate; @Autowired private TicketStockMapper stockMapper; @Autowired private RedissonClient redissonClient; /** * 扣减库存(带分布式锁) */ @Transactional(rollbackFor = Exception.class) public boolean deductStock(String trainId, String seatType, int count) { String lockKey = "lock:stock:" + trainId + ":" + seatType; RLock lock = redissonClient.getLock(lockKey); try { // 1. 获取分布式锁(防止超卖) boolean locked = lock.tryLock(3, 10, TimeUnit.SECONDS); if (!locked) { throw new BusinessException("系统繁忙,请稍后重试"); } // 2. 查询当前库存 Integer currentStock = stockMapper.selectStock(trainId, seatType); if (currentStock == null || currentStock < count) { throw new BusinessException("库存不足"); } // 3. 更新数据库 int rows = stockMapper.updateStock(trainId, seatType, currentStock - count, currentStock); if (rows <= 0) { throw new BusinessException("库存更新失败"); } // 4. 提交事务后,异步删除缓存 TransactionSynchronizationManager.registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { deleteCacheAsync(trainId, seatType); } } ); return true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("系统异常"); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } /** * 异步删除缓存(带重试机制) */ private void deleteCacheAsync(String trainId, String seatType) { String cacheKey = buildCacheKey(trainId, seatType); CompletableFuture.runAsync(() -> { // 重试机制 for (int i = 0; i < 3; i++) { try { Boolean deleted = redisTemplate.delete(cacheKey); if (Boolean.TRUE.equals(deleted)) { log.info("缓存删除成功: {}", cacheKey); break; } } catch (Exception e) { log.error("第{}次删除缓存失败: {}", i + 1, cacheKey, e); if (i < 2) { try { Thread.sleep(100 * (i + 1)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } }); } /** * 查询库存(带防穿透和击穿保护) */ public Integer getStock(String trainId, String seatType) { String cacheKey = buildCacheKey(trainId, seatType); // 1. 尝试从缓存获取 Integer stock = redisTemplate.opsForValue().get(cacheKey); if (stock != null) { // 防缓存穿透:空值标记 if (stock == -1) { return 0; } return stock; } // 2. 获取分布式锁,防止缓存击穿 String lockKey = "lock:query:" + cacheKey; RLock lock = redissonClient.getLock(lockKey); try { // 只允许一个线程查询数据库 boolean locked = lock.tryLock(1, 5, TimeUnit.SECONDS); if (!locked) { // 等待其他线程加载 Thread.sleep(50); return getStock(trainId, seatType); } // 3. 双重检查(Double Check) stock = redisTemplate.opsForValue().get(cacheKey); if (stock != null) { return stock; } // 4. 查询数据库 stock = stockMapper.selectStock(trainId, seatType); // 5. 写入缓存 if (stock == null) { // 防缓存穿透:缓存空值,短暂过期 redisTemplate.opsForValue().set(cacheKey, -1, 60, TimeUnit.SECONDS); return 0; } else { // 设置随机过期时间,防止雪崩 int expireTime = 300 + new Random().nextInt(60); redisTemplate.opsForValue().set(cacheKey, stock, expireTime, TimeUnit.SECONDS); } return stock; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException("查询异常"); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } private String buildCacheKey(String trainId, String seatType) { return String.format("stock:%s:%s", trainId, seatType); } }
3.1.3 一致性分析

text

时间线分析: 1. 写请求更新数据库(10ms) 2. 写请求删除缓存(1ms) 3. 读请求查询缓存(未命中) 4. 读请求查询数据库(10ms) 5. 读请求写入缓存(1ms) 不一致时间窗口 ≈ 22ms(实际业务可接受)

3.2 方案6:Binlog异步更新缓存(高级方案)

3.2.1 架构设计

text

┌─────────┐ ┌─────────┐ ┌───────────┐ ┌─────────┐ │ 应用服务 │───▶│ MySQL │───▶│ Canal │───▶│ MQ │ └─────────┘ └─────────┘ └───────────┘ └─────────┘ │ ▼ ┌─────────┐ │ 消费者 │───▶ Redis └─────────┘
3.2.2 实现方案

java

@Component @Slf4j public class BinlogSyncConsumer { @Autowired private RedisTemplate<String, Integer> redisTemplate; @Autowired private RedissonClient redissonClient; /** * 监听库存变更消息 */ @KafkaListener(topics = "stock.change") public void consumeStockChange(StockChangeMessage message) { String cacheKey = buildCacheKey(message.getTrainId(), message.getSeatType()); // 使用分布式锁保证更新顺序 String lockKey = "lock:binlog:" + cacheKey; RLock lock = redissonClient.getLock(lockKey); try { // 1. 获取锁,防止并发更新 boolean locked = lock.tryLock(3, 30, TimeUnit.SECONDS); if (!locked) { log.warn("获取锁失败,消息将重新入队: {}", message); throw new RuntimeException("获取锁失败"); } // 2. 检查数据版本(防止旧数据覆盖新数据) StockCacheMeta meta = getCacheMeta(cacheKey); if (meta != null && meta.getVersion() >= message.getVersion()) { log.info("跳过旧版本数据: {}", message); return; } // 3. 更新缓存 redisTemplate.opsForValue().set(cacheKey, message.getStock()); // 4. 更新元数据 updateCacheMeta(cacheKey, message.getVersion(), System.currentTimeMillis()); log.info("缓存更新成功: {}", message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("处理中断"); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } /** * 版本号冲突解决策略 */ private void handleVersionConflict(String cacheKey, StockChangeMessage message) { // 方案1:查询最新数据库数据 Integer latestStock = queryLatestFromDB(message); // 方案2:使用时间戳判断 long cacheTimestamp = getCacheTimestamp(cacheKey); if (message.getTimestamp() > cacheTimestamp) { updateCacheWithRetry(cacheKey, message.getStock()); } } @Data private static class StockCacheMeta { private Integer version; private Long timestamp; } }
3.2.3 消息顺序保证

java

@Component public class BinlogMessageSequencer { /** * 保证相同key的消息顺序消费 */ @Bean public KafkaMessageListenerContainer<String, StockChangeMessage> kafkaMessageListenerContainer() { ContainerProperties containerProps = new ContainerProperties("stock.change"); // 使用ConcurrentMessageListenerContainer实现并发消费 ConcurrentMessageListenerContainer<String, StockChangeMessage> container = new ConcurrentMessageListenerContainer<>(consumerFactory(), containerProps); // 按key分区,保证同一列车座位的消息顺序消费 container.setConcurrency(3); container.getContainerProperties().setMessageListener(new MessageListener<String, StockChangeMessage>() { @Override public void onMessage(ConsumerRecord<String, StockChangeMessage> record) { String key = record.key(); // trainId:seatType processMessage(key, record.value()); } }); return container; } private void processMessage(String key, StockChangeMessage message) { // 使用本地队列保证同一key的顺序处理 MessageQueue queue = getMessageQueue(key); queue.offer(message); // 单线程处理同一key的消息 processQueue(queue); } }

四、12306实际解决方案

4.1 混合方案设计

12306采用"先写数据库再删缓存"为主,"Binlog异步补偿"为辅的混合方案。

java

@Service public class TicketStockServiceImpl implements TicketStockService { /** * 主方案:先写DB再删缓存 */ @Override @Transactional public boolean deductStock(String trainId, String seatType, int count) { // 1. 数据库扣减(带乐观锁) boolean success = deductFromDatabase(trainId, seatType, count); if (success) { // 2. 同步删除缓存 deleteCacheSync(trainId, seatType); // 3. 发送Binlog消息(用于补偿和监控) sendBinlogMessage(trainId, seatType); } return success; } /** * 数据库扣减(乐观锁实现) */ private boolean deductFromDatabase(String trainId, String seatType, int count) { int retryCount = 0; while (retryCount < 3) { // 查询当前库存和版本号 TicketStock stock = stockMapper.selectForUpdate(trainId, seatType); if (stock.getAvailableSeats() < count) { return false; } // 乐观锁更新 int rows = stockMapper.updateWithOptimisticLock( trainId, seatType, stock.getAvailableSeats() - count, stock.getVersion() ); if (rows > 0) { return true; } retryCount++; // 指数退避 sleep((long) Math.pow(2, retryCount) * 10); } return false; } /** * 同步删除缓存(带重试) */ private void deleteCacheSync(String trainId, String seatType) { String cacheKey = buildCacheKey(trainId, seatType); try { // 立即删除 redisTemplate.delete(cacheKey); // 延迟双删(处理极端情况) CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS) .execute(() -> redisTemplate.delete(cacheKey)); } catch (Exception e) { log.error("缓存删除失败,将记录补偿任务", e); recordCompensationTask(cacheKey); } } }

4.2 补偿机制

java

@Component @Slf4j public class CacheCompensationService { @Autowired private RedisTemplate<String, Integer> redisTemplate; @Autowired private TicketStockMapper stockMapper; /** * 定时补偿任务 */ @Scheduled(fixedDelay = 60000) // 每分钟执行一次 public void executeCompensation() { List<CompensationTask> tasks = getPendingTasks(); for (CompensationTask task : tasks) { try { // 1. 查询数据库最新数据 Integer dbStock = stockMapper.selectStock( task.getTrainId(), task.getSeatType()); // 2. 查询缓存当前数据 String cacheKey = task.getCacheKey(); Integer cacheStock = redisTemplate.opsForValue().get(cacheKey); // 3. 比较并修复 if (!Objects.equals(dbStock, cacheStock)) { if (dbStock == null) { redisTemplate.delete(cacheKey); } else { redisTemplate.opsForValue().set(cacheKey, dbStock); } log.info("缓存修复完成: {}", task); } // 4. 标记任务完成 markTaskCompleted(task); } catch (Exception e) { log.error("补偿任务执行失败: {}", task, e); task.setRetryCount(task.getRetryCount() + 1); if (task.getRetryCount() >= 3) { markTaskFailed(task); } } } } }

4.3 监控告警体系

yaml

# 监控指标配置 metrics: cache: hit_rate: "redis.command.statistics.hits / redis.command.statistics.total" miss_rate: "redis.command.statistics.misses / redis.command.statistics.total" inconsistency_rate: "count(compensation_tasks) / count(update_operations)" alert: rules: - name: "缓存不一致率过高" expr: "inconsistency_rate > 0.01" for: "5m" labels: severity: "warning" - name: "缓存命中率过低" expr: "hit_rate < 0.8" for: "10m" labels: severity: "critical"

五、最佳实践总结

5.1 方案选择建议

业务场景推荐方案说明
普通电商库存先写DB再删缓存一致性要求中等,实现简单
金融账户余额缓存双删 + 事务强一致性要求,可接受一定复杂度
大型分布式系统Binlog异步更新多数据源,最终一致性
秒杀场景先写DB再删缓存 + 本地缓存极高并发,需要多层保护

5.2 关键注意事项

  1. 分布式锁使用

    • 选择Redisson或Curator成熟的解决方案

    • 设置合理的锁超时时间

    • 避免锁粒度过大影响性能

  2. 重试机制

    • 实现指数退避算法

    • 设置最大重试次数

    • 记录重试日志便于排查

  3. 监控告警

    • 监控缓存命中率

    • 监控不一致率

    • 设置合理的告警阈值

  4. 降级策略

    • 缓存不可用时降级到数据库

    • 设置本地缓存作为二级缓存

    • 实现熔断机制

5.3 性能优化建议

  1. 缓存Key设计

    java

    // 反例:过于复杂 String key = "stock:" + trainId + ":" + seatType + ":" + date; // 正例:简洁高效 String key = String.format("s:%s:%s", trainId, seatType);
  2. 批量操作

    java

    // 批量删除缓存 public void batchDeleteCache(List<String> keys) { redisTemplate.delete(keys); }
  3. 连接池优化

    yaml

    redis: lettuce: pool: max-active: 200 max-idle: 50 min-idle: 10 max-wait: 1000ms

5.4 故障恢复预案

java

@Component @Slf4j public class CacheDisasterRecovery { /** * 缓存全量重建 */ public void rebuildAllCache() { // 1. 设置维护标志 setMaintenanceFlag(true); try { // 2. 分批次重建 int batchSize = 1000; int offset = 0; while (true) { List<TicketStock> stocks = stockMapper.selectBatch(offset, batchSize); if (stocks.isEmpty()) { break; } // 3. 批量写入缓存 Map<String, Integer> cacheData = new HashMap<>(); for (TicketStock stock : stocks) { String key = buildCacheKey(stock.getTrainId(), stock.getSeatType()); cacheData.put(key, stock.getAvailableSeats()); } redisTemplate.opsForValue().multiSet(cacheData); offset += batchSize; log.info("已重建 {} 条缓存记录", offset); } } finally { // 4. 清除维护标志 setMaintenanceFlag(false); } } }

六、总结

在12306这样的高并发系统中,缓存与数据库的一致性保障是一个复杂的系统工程。通过实践总结:

  1. 没有银弹方案:需要根据具体业务场景选择合适策略

  2. 混合方案更优:主方案+补偿机制+监控告警的组合

  3. 监控是保障:完善的监控体系能及时发现和解决问题

  4. 容错是关键:设计时要考虑各种异常情况的处理

推荐大多数企业采用"先写数据库再删除缓存"作为主方案,配合Binlog异步更新作为补偿机制,建立完善的监控告警体系,这样可以在保证性能的同时,获得较好的数据一致性保障。

本回答由 AI 生成,内容仅供参考,请仔细甄别。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:15:59

Adobe acrobat 免费下载、安装图文教程(附安装包,图超详细)

Adobe acrobat 是一款针对 PDF 文件打造的办公工具&#xff0c;能读、能改、能合并、能签名、能加密&#xff0c;还能把纸质文件一键扫成可搜索的 PDF&#xff0c;是个人和企业处理电子文档的标配工具。 Adobe acrobat 主要用于创建、编辑、管理和签署 PDF 文件&#xff0c;支…

作者头像 李华
网站建设 2026/4/16 9:04:39

网络工程师想要转行,有没有啥建议?

网络工程师想要转行&#xff0c;有没有啥建议&#xff1f; 转行不是一时冲动&#xff0c;得先搞清楚动机。作为网络工程师&#xff0c;你可能已经掌握了TCP/IP协议、路由配置、防火墙设置这些硬核技能&#xff0c;但现实往往残酷。行业饱和了&#xff0c;新人涌入&#xff0c;…

作者头像 李华
网站建设 2026/4/16 5:38:28

35、Unix 拼写检查器与进程管理全解析

Unix 拼写检查器与进程管理全解析 1. Unix 拼写检查器发展历程 早期的 Unix 拼写检查器经历了多个版本的演变。最初的 Unix 拼写检查器是一个管道,之后出现了用 C 语言编写的版本。1975 年的 Version 6 Unix 中的 typo 命令,约 350 行 C 代码;1979 年 Version 7 Unix 发…

作者头像 李华
网站建设 2026/4/16 9:03:23

44、Unix文件系统:结构、特性与操作详解

Unix文件系统:结构、特性与操作详解 1. Unix文件系统简介 Unix操作系统通过将文件分组到目录中来管理大量文件,每个目录形成独立的命名空间,避免文件名冲突,同时便于文件管理。目录还能为文件提供默认属性。 Unix文件系统呈树状结构,根目录名为 / (ASCII斜杠)。斜杠…

作者头像 李华