news 2026/4/16 8:59:11

Sharding分库分表复杂SQL之数据源路由

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Sharding分库分表复杂SQL之数据源路由

Sharding分库分表复杂SQL之数据源路由

  • 一、问题及分析
    • 1. 背景
    • 2. 方案
  • 二、数据源动态切换
    • 1. 配置及代码实现
    • 2. 动态数据源使用
    • 3. `事务拦截器TransactionInterceptor`
    • 4. 数据源动态切换流程图
  • 三、关于事务管理
    • 1. 混合事务
      • 方案一:分布式事务
      • 方案二:拆分事务(妥协方案)

一、问题及分析

1. 背景

数据库中有20张业务表,其中 有两张表(订单表、交易明细表)因为数据量太大进行了分库分表,其余18张表保持单库单表结构。现有系统架构变为:Spring Boot + Spring Cloud + ShardingSphere 的微服务系统。但是,ShardingSphere 对【复杂 SQL】(如多表关联、子查询、窗口函数等)支持度不足 和 性能问题。对此有这样的需求:既需要支持复杂查询,又要对分片表进行有效的水平拆分。


2. 方案

混合数据源方案:在部分表分片、部分表不分片的场景下,混合使用不同数据源(这是一种标准做法)。

具体就是使用Spring的AbstractRoutingDataSource来动态路由数据源(自定义一个路由数据源,继承AbstractRoutingDataSource)。

我们需要在两个数据源之间做路由:一个是ShardingSphereDataSource(负责分片表,也就是分库分表的表),另一个是普通数据源defaultDataSource(负责其他 非分片表)。

但是,又有新的问题如何决定使用哪个数据源

  • 方案一:通过解析sql,判断是否涉及分片表,从而决定要使用哪个数据源。
    • 该方案的缺点:通过解析SQL去判断是否涉及分片表可能比较复杂,而且解析可能不准确。
    • 该方案存在的问题:通常来说,我们会将业务逻辑放在 Service 层,具体要去 操作哪几张表、用哪个数据源,都是业务逻辑决定的。所以说,数据源切换 应该在 “方法开始前” 决定,而不是在 SQL 执行前。方案一不考虑。
  • 方案二通过AOP切面在Service层 根据方法名或注解来切换数据源。当然,像这样的处理方案是需要写入项目的编码规则文档中。
    • 缺点:需要开发人员明确指定每个方法使用哪个数据源。

注意事项

  • ① 如果一条SQL只操作非分片表,就用defaultDataSource;如果一条SQL只操作分片表,就用ShardingDataSource;但是如果一条SQL同时操作了分片表和非分片表,那么就需要同时使用两个数据源,这就会涉及到 【分布式事务】。假设目前没有同时操作两个数据源的业务,所以暂不讨论这种情况。
  • ② 目前没有同时操作两个数据源的情况,也就是说,在业务上能够保证 在同一个事务中不会同时操作两个数据源,所以说,我们就可以使用Spring的事务管理,我们可以配置两个事务管理器,并且指定每个事务管理器对应哪个数据源。这样做方便后期扩展,当然目前还不需要,可以不用配置,直接使用 @Transaction注解即可。
  • ③ Spring事务管理是通过AOP实现的,而我们的数据源动态路由也是通过AOP实现的。所以说,数据源动态路由切面 要在 事务切面 之前执行,否则会导致数据源切换失效。我们可以通过调整切面顺序来解决,通常使用@Order注解。

二、数据源动态切换

  • 可以先看一下第4小节【4. 数据源动态切换流程图】。

1. 配置及代码实现

使用AbstractRoutingDataSource+AOP的方案:

1). 数据源配置

确保两个数据源使用独立的连接池配置

# application.ymlspring:# 默认数据源配置(用于不分片的18张表)datasource:url:jdbc:mysql://localhost:3306/db0username:rootpassword:rootdriver-class-name:com.mysql.cj.jdbc.Driver# ShardingSphere 数据源配置(用于分片的2张表)shardingsphere:datasource:names:ds0,ds1ds0:url:jdbc:mysql://localhost:3306/db0ds1:url:jdbc:mysql://localhost:3307/db1# ... 其他ShardingSphere配置

2). 数据源类型枚举

publicenumDataSourceType{DEFAULT,// 默认数据源(不分片表)SHARDING// ShardingSphere数据源(分片表)}

3). 数据源上下文

publicclassDataSourceContextHolder{// ThreadLocal存储当前要切到哪个数据源privatestaticfinalThreadLocal<DataSourceType>CONTEXT_HOLDER=newThreadLocal<>();publicstaticvoidsetDataSourceType(DataSourceTypedataSourceType){CONTEXT_HOLDER.set(dataSourceType);}publicstaticDataSourceTypegetDataSourceType(){returnCONTEXT_HOLDER.get();}publicstaticvoidclearDataSourceType(){CONTEXT_HOLDER.remove();}publicstaticbooleanisShardingDataSource(){returnDataSourceType.SHARDING.equals(CONTEXT_HOLDER.get());}}

4). 自定义数据源路由

// 继承AbstractRoutingDataSource@ComponentpublicclassDynamicDataSourceextendsAbstractRoutingDataSource{@OverrideprotectedObjectdetermineCurrentLookupKey(){returnDataSourceContextHolder.getDataSourceType();}}

5). 数据源配置类

@ConfigurationpublicclassDataSourceConfig{// ==================== 1. 默认数据源(Spring Boot自动配置的数据源,不分片表使用)@Bean("defaultDataSource")@ConfigurationProperties(prefix="spring.datasource")publicDataSourcedefaultDataSource(){returnDataSourceBuilder.create().build();}// ==================== 2. ShardingSphere数据源配置// ShardingSphere数据源(分片表使用)@Bean("shardingDataSource")publicDataSourceshardingDataSource()throwsSQLException{// 通过 Yaml 的方式创建数据源URLyamlResource=ClassLoader.getSystemClassLoader().getResource("sharding.yaml");returnYamlShardingSphereDataSourceFactory.createDataSource(newFile(yamlResource.toURI()));}// ==================== 3. 动态数据源配置 ==========@Primary@Bean("dynamicDataSource")publicDataSourcedataSource(@Qualifier("defaultDataSource")DataSourcedefaultDataSource,@Qualifier("shardingDataSource")DataSourceshardingDataSource){// 创建DynamicDataSource,绑定两个数据源DynamicDataSourcedynamicDataSource=newDynamicDataSource();// 设置数据源映射Map<Object,Object>targetDataSources=newHashMap<>();targetDataSources.put(DataSourceType.DEFAULT,defaultDataSource);targetDataSources.put(DataSourceType.SHARDING,shardingDataSource);dynamicDataSource.setTargetDataSources(targetDataSources);// 设置默认数据源dynamicDataSource.setDefaultTargetDataSource(defaultDataSource);// 数据源初始化后执行dynamicDataSource.afterPropertiesSet();returndynamicDataSource;}// ==================== 4. 事务管理器配置 ====================@Bean(name="transactionManager")publicPlatformTransactionManagertransactionManager(@Qualifier("dynamicDataSource")DataSourcedynamicDataSource){// 事务管理器 绑定 动态数据源returnnewDataSourceTransactionManager(dynamicDataSource);}}

6). 自动切换数据源

// 1. 自定义注解@Target({ElementType.METHOD,ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public@interfaceDataSourceAnno{DataSourceTypevalue()defaultDataSourceType.DEFAULT;// 默认用 DEFAULT 数据源}// 2. 动态数据源AOP切面@Aspect@Component@Order(-1)// 值比默认值小,确保先于事务拦截器(或 事务注解)执行(这个注解的默认值为 Integer.MAX_VALUE)publicclassDataSourceAspect{// 方法1:基于自定义注解切换@Before("@annotation(dataSourceAnno)")publicvoidswitchDataSourceByAnnotation(JoinPointjoinPoint,DataSourcedataSource){DataSourceContextHolder.setDataSourceType(dataSource.value());}// 方法2:基于方法名自动识别@Before("execution(* com.sh.service.*.*(..))")publicvoidswitchDataSourceByMethod(JoinPointjoinPoint){StringmethodName=joinPoint.getSignature().getName();Object[]args=joinPoint.getArgs();// 根据方法名判断是否需要分片表操作if(isShardingTableOperation(methodName,args)){DataSourceContextHolder.setDataSourceType(DataSourceType.SHARDING);}else{DataSourceContextHolder.setDataSourceType(DataSourceType.DEFAULT);}}privatebooleanisShardingTableOperation(StringmethodName,Object[]args){// 分片表识别逻辑,比如:我们可以规定使用 ShardingSphereDataSource 的方法名 前缀统一为:shardingDsreturnmethodName.contains("shardingDs");}@After("@annotation(dataSourceAnno)")// 或者 @After("execution(* com.sh.service.*.*(..))")publicvoidrestoreDataSource(JoinPointjoinPoint){// 清理,防止内存泄漏DataSourceContextHolder.clearDataSourceType();}}

2. 动态数据源使用

由于目前一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表),所以,可以使用Spring的本地事务管理。

// 在 Service 层使用注解@ServicepublicclassOrderService{// 1.操作分表时用 sharding@DataSourceAnno(DataSourceType.SHARDING)@Transactional// 【Spring本地事务】publicvoidprocessOrder(){orderMapper.insertOrderItem(...);// order_item(分表)orderMapper.insertOrderInfo(...);// order_info(分表)}// 2.操作普通表时用 DEFAULT@DataSourceAnno(DataSourceType.DEFAULT)@Transactional// 【Spring本地事务】publicvoidprocessUser(){userMapper.insertUser(...);// user(普通表)productMapper.insertProduct(...);// product(普通表)}}

3.事务拦截器TransactionInterceptor

上面我们在使用 Spirng的本地事务管理时 用的是:@Transactional注解,但是,有时候我们会忘记添加这个注解或者说到处写这个注解太麻烦了。我们就可以使用@Aspect配置式AOP 来装配 事务拦截器(TransactionInterceptor)的方案,完全摆脱@Transactional注解。这是Spring框架内更原生的编程式事务管理方式。

importorg.aspectj.lang.annotation.Aspect;importorg.springframework.aop.Advisor;importorg.springframework.aop.aspectj.AspectJExpressionPointcut;importorg.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;importorg.springframework.aop.support.DefaultPointcutAdvisor;importorg.springframework.aop.support.NameMatchMethodPointcut;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.PlatformTransactionManager;importorg.springframework.transaction.TransactionDefinition;importorg.springframework.transaction.interceptor.*;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;@Aspect@ConfigurationpublicclassTransactionConfig{// 1. 定义切入点(拦截 com.pro.service 包下所有方法)privatestaticfinalStringAOP_POINTCUT_EXPRESSION="execution(* com.sh.service..*.*(..))";// 2. 定义增删改方法前缀(自动加事务)privatestaticfinalString[]REQUIRED_RULE_TRANSACTION={"insert*","create*","add*","save*","update*","modify*","del*","delete*","remove*"};// 3. 定义查询方法前缀(自动加只读事务)privatestaticfinalString[]READ_RULE_TRANSACTION={"select*","get*","query*","search*","count*","find*","list*","page*"};// 4. 注入事务管理器@AutowiredprivatePlatformTransactionManagertransactionManager;// ========== 5. 核心:配置事务拦截器 (TransactionInterceptor)@BeanpublicTransactionInterceptortxAdvice(){// 5.1 创建事务属性源NameMatchTransactionAttributeSourcetas=newNameMatchTransactionAttributeSource();// 5.2 配置增删改事务属性(REQUIRED)RuleBasedTransactionAttributerequiredTx=newRuleBasedTransactionAttribute();requiredTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));// 发生异常回滚requiredTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);// 事务隔离级别:读已提交requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);// 事务传播行为:REQUIREDrequiredTx.setTimeout(30);// 超时30秒// 5.3 配置查询事务属性(只读)RuleBasedTransactionAttributereadOnlyTx=newRuleBasedTransactionAttribute();readOnlyTx.setRollbackRules(Collections.singletonList(newRollbackRuleAttribute(Exception.class)));readOnlyTx.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);// 支持当前事务,不存在也不新建readOnlyTx.setReadOnly(true);// 关键:设置为只读readOnlyTx.setTimeout(20);// 查询超时可设短些// 5.4 将方法名模式映射到事务属性Map<String,TransactionAttribute>txMap=newHashMap<>();for(StringmethodName:REQUIRED_RULE_TRANSACTION){txMap.put(methodName,requiredTx);}for(StringmethodName:READ_RULE_TRANSACTION){txMap.put(methodName,readOnlyTx);}tas.setNameMap(txMap);// 5.5 创建并返回事务拦截器returnnewTransactionInterceptor(transactionManager,tas);}// ========== 6. 核心:配置切面(Advisor),将切入点和拦截器关联@BeanpublicAdvisortxAdviceAdvisor(){// 6.1 创建切入点(使用AspectJ表达式)AspectJExpressionPointcutpointcut=newAspectJExpressionPointcut();pointcut.setExpression(AOP_POINTCUT_EXPRESSION);// 6.2 创建Advisor(通知器),将切入点和事务通知绑定returnnewDefaultPointcutAdvisor(pointcut,txAdvice());}}

4. 数据源动态切换流程图

before(): 设置数据源Key
到ThreadLocal
1. 根据方法名匹配规则
2. 从事务管理器获取连接
此时ThreadLocal中
已有正确数据源Key
成功
异常
客户端调用 Service 方法
动态数据源代理
数据源切面
DataSourceAspect
事务切面代理
事务拦截器
TransactionInterceptor
获取事务属性
DataSourceTransactionManager
动态数据源
determineCurrentLookupKey()
路由到真实物理数据源
执行SQL
方法执行结束?
TransactionInterceptor
提交事务
TransactionInterceptor
回滚事务
DataSourceAspect.after()
清理ThreadLocal
返回结果给客户端

三、关于事务管理

1. 单数据源事务:如果一次操作只涉及一个数据源(要么全部是分片表,要么全部是非分片表)。就可以使用Spring的本地事务管理,因为动态数据源会路由到同一个物理数据源。

2.混合数据源事务如果需要同时操作分片表和非分片表,则需要引入分布式事务方案(如Seata)或者 【将操作拆分为两个独立的事务


1. 混合事务

问题
当一次业务操作需要同时更新分片表和非分片表时,由于这两个表【可能】位于不同的物理数据库(或同一数据库的不同数据源管理),我们需要考虑事务的一致性。

方案一:分布式事务

分布式事务的内容有点多,这里就先用伪代码:

// 使用 Seata 等分布式事务框架@GlobalTransactional// 全局分布式事务注解publicvoidmixedOperation(){// 操作非分片表(默认数据源)defaultService.updateNormalTable();// 操作分片表(ShardingSphere 数据源)shardingService.updateShardingTable();}

方案二:拆分事务(妥协方案)

将原本在一个事务中的操作,拆分成两个独立的事务,每个事务只操作一个数据源(要么是分片数据源,要么是默认数据源)。这样,每个事务都是本地事务,由各自的数据源事务管理器管理。

假设我们有一个业务方法,需要先更新非分片表A,然后更新分片表B。

原本的设计(问题代码)

// ❌ 错误做法:试图用一个事务控制两个数据源@TransactionalpublicvoidcreateOrderWithUserInfo(Useruser,Orderorder){// 更新用户信息(非分片表,使用默认数据源)userMapper.update(user);// 默认数据源// 创建订单(分片表,使用ShardingSphere数据源)orderMapper.insert(order);// ShardingSphere数据源// 这里会出现事务问题!}

拆分后的设计

@AutowiredprivateTransactionTemplatetransactionTemplate;// ✅ 将业务拆分成两个独立的事务publicvoiddoBusiness(){// 第一个事务,操作非分片表AtransactionTemplate.execute(status->{updateNonShardingTableA();returnnull;});// 第二个事务,操作分片表BtransactionTemplate.execute(status->{updateShardingTableB();returnnull;});}

拆分事务的优缺点

  • 优点:
    • 实现简单:不需要引入复杂的分布式事务框架
    • 性能较好:避免了分布式事务的网络开销和锁竞争
    • 技术栈轻量:减少了系统复杂度
  • 缺点:
    • 数据一致性风险:如果第二个事务失败,第一个事务已经提交,则无法回滚。因此,这种拆分需要根据业务场景来判断是否可接受如果业务要求两个操作要么都成功,要么都失败,那么就需要引入分布式事务
    • 业务逻辑复杂:
      • 需要实现补偿机制。
      • 需要考虑幂等性。
    • 业务限制:
      • 如果两个操作必须同时成功或同时失败,此方案不适用。
      • 如果第二个操作依赖第一个操作的结果,拆分不可行。

补充说明

在拆分事务时,我们通常需要根据业务逻辑考虑是否允许中间状态。如果业务允许(例如,先记录日志,再更新分片表,即使分片表更新失败,日志也可以保留),那么可以拆分。如果不允许,则需要考虑其他方案,如:

  • 使用分布式事务(如Seata)保证两个数据源的事务一致性。
  • 重新设计数据模型,将相关操作放到同一个数据源中。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/13 6:23:10

Windows下Python安装Stable Diffusion 3.5 FP8踩坑总结

Windows下Python安装Stable Diffusion 3.5 FP8踩坑总结 在AI生成图像技术飞速发展的今天&#xff0c;越来越多的设计师、开发者和内容创作者希望在本地设备上运行高性能的文生图模型。然而&#xff0c;当面对像 Stable Diffusion 3.5 这样参数庞大、显存需求高的模型时&#x…

作者头像 李华
网站建设 2026/4/8 0:09:24

百度指数飙升!Qwen-Image成近期AI热搜词

Qwen-Image&#xff1a;从技术跃迁到产业重塑的AIGC新范式 在广告设计团队还在为一张海报反复修改三天时&#xff0c;某新锐品牌已经用自然语言生成了整套视觉方案——“青绿山水背景&#xff0c;书法字体‘静雅’居中&#xff0c;竹影斑驳”。按下回车&#xff0c;10241024高清…

作者头像 李华
网站建设 2026/4/15 13:47:45

毕业/期刊/职称论文不愁!6款免费AI工具一键极速生成,省时超80%

在学术的道路上&#xff0c;论文写作往往是大学生、研究生和科研人员面临的一大挑战。从选题到定稿&#xff0c;每一个环节都需要耗费大量的时间和精力。不过&#xff0c;随着人工智能技术的发展&#xff0c;一系列AI论文工具应运而生&#xff0c;为我们的论文写作带来了极大的…

作者头像 李华
网站建设 2026/4/15 1:38:53

Stable Diffusion 3.5 FP8 vs 原始版本:显存占用对比实测报告

Stable Diffusion 3.5 FP8 vs 原始版本&#xff1a;显存占用对比实测报告 在生成式 AI 的浪潮中&#xff0c;Stable Diffusion 系列始终扮演着“开源先锋”的角色。从 SD1.x 到如今的 Stable Diffusion 3.5&#xff08;SD3.5&#xff09;&#xff0c;每一次迭代都在图像质量、语…

作者头像 李华
网站建设 2026/4/11 18:13:15

Wan2.2-T2V-5B生成结果如何评估?基于DiskInfo下载官网的数据存储建议

Wan2.2-T2V-5B生成结果如何评估&#xff1f;基于DiskInfo下载官网的数据存储建议 在短视频内容爆炸式增长的今天&#xff0c;用户对“秒级响应、低成本、高质量”视频生成的需求已经从理想变为刚需。无论是社交媒体运营者需要快速产出创意素材&#xff0c;还是开发者希望在本地…

作者头像 李华
网站建设 2026/4/15 8:05:18

OpenSpec生态共建:LLama-Factory贡献者招募计划启动

OpenSpec生态共建&#xff1a;LLama-Factory贡献者招募计划启动 在大模型技术飞速演进的今天&#xff0c;一个现实问题日益凸显&#xff1a;尽管像LLaMA、Qwen这样的预训练语言模型展现出惊人的通用能力&#xff0c;但真正落地到具体行业场景时——无论是金融客服中的合规问答&…

作者头像 李华