一、基于数据库表的方案
- 悲观锁实现(行锁)
-- 1. 创建锁表CREATETABLE`distributed_lock`(`id`int(11)NOTNULLAUTO_INCREMENT,`lock_key`varchar(64)NOTNULLCOMMENT'锁标识',`business_id`varchar(255)DEFAULTNULLCOMMENT'业务标识',`expire_time`datetimeDEFAULTNULLCOMMENT'过期时间',`create_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMP,PRIMARYKEY(`id`),UNIQUEKEY`uk_lock_key`(`lock_key`))ENGINE=InnoDBDEFAULTCHARSET=utf8;-- 2. 获取锁BEGIN;SELECT*FROMdistributed_lockWHERElock_key='order_lock_123'FORUPDATE;-- 检查锁是否过期-- 如果无记录或已过期,插入或更新INSERTINTOdistributed_lock(lock_key,business_id,expire_time)VALUES('order_lock_123','order_001',DATE_ADD(NOW(),INTERVAL30SECOND))ONDUPLICATEKEYUPDATEbusiness_id=VALUES(business_id),expire_time=VALUES(expire_time);COMMIT;-- 3. 释放锁DELETEFROMdistributed_lockWHERElock_key='order_lock_123'ANDbusiness_id='order_001';- 乐观锁实现(版本号)
-- 创建带版本号的锁表CREATETABLE`optimistic_lock`(`id`int(11)NOTNULLAUTO_INCREMENT,`resource_name`varchar(64)NOTNULLCOMMENT'资源名',`version`int(11)NOTNULLDEFAULT'0'COMMENT'版本号',`owner`varchar(64)DEFAULTNULLCOMMENT'持有者',PRIMARYKEY(`id`),UNIQUEKEY`uk_resource`(`resource_name`))ENGINE=InnoDBDEFAULTCHARSET=utf8;-- 获取锁(CAS操作)UPDATEoptimistic_lockSETversion=version+1,owner='service_A'WHEREresource_name='resource_1'ANDversion=#{oldVersion};二、基于GET_LOCK()函数的方案
-- 1. 获取锁(会话级)SELECTGET_LOCK('my_lock',10);-- 等待10秒-- 2. 检查是否获取成功SELECTIS_FREE_LOCK('my_lock');SELECTIS_USED_LOCK('my_lock');-- 3. 释放锁SELECTRELEASE_LOCK('my_lock');-- 4. 释放所有锁SELECTRELEASE_ALL_LOCKS();三、完整实现示例
Java 实现类
importjavax.sql.DataSource;importjava.sql.Connection;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.concurrent.TimeUnit;publicclassMySQLDistributedLock{privateDataSourcedataSource;privateStringlockKey;privatelongexpireMillis=30000;privateStringownerId;publicMySQLDistributedLock(DataSourcedataSource,StringlockKey){this.dataSource=dataSource;this.lockKey=lockKey;this.ownerId=generateOwnerId();}publicbooleantryLock(longwaitTime,TimeUnitunit)throwsSQLException{longstart=System.currentTimeMillis();longtimeout=unit.toMillis(waitTime);while(System.currentTimeMillis()-start<timeout){if(acquireLock()){returntrue;}try{Thread.sleep(100);// 重试间隔}catch(InterruptedExceptione){Thread.currentThread().interrupt();returnfalse;}}returnfalse;}privatebooleanacquireLock()throwsSQLException{Connectionconn=null;try{conn=dataSource.getConnection();conn.setAutoCommit(false);// 使用行级锁StringcheckSql="SELECT id, expire_time FROM distributed_lock "+"WHERE lock_key = ? FOR UPDATE";try(PreparedStatementps=conn.prepareStatement(checkSql)){ps.setString(1,lockKey);ResultSetrs=ps.executeQuery();if(rs.next()){// 检查是否过期java.sql.TimestampexpireTime=rs.getTimestamp("expire_time");if(expireTime!=null&&expireTime.after(newjava.util.Date())){// 锁被其他进程持有且未过期conn.rollback();returnfalse;}}// 获取或更新锁StringupsertSql="INSERT INTO distributed_lock "+"(lock_key, owner_id, expire_time) VALUES (?, ?, ?) "+"ON DUPLICATE KEY UPDATE "+"owner_id = VALUES(owner_id), "+"expire_time = VALUES(expire_time)";try(PreparedStatementupsertPs=conn.prepareStatement(upsertSql)){upsertPs.setString(1,lockKey);upsertPs.setString(2,ownerId);upsertPs.setTimestamp(3,newjava.sql.Timestamp(System.currentTimeMillis()+expireMillis));introws=upsertPs.executeUpdate();conn.commit();returnrows>0;}}}catch(SQLExceptione){if(conn!=null){conn.rollback();}throwe;}finally{if(conn!=null){conn.setAutoCommit(true);conn.close();}}}publicvoidunlock()throwsSQLException{Stringsql="DELETE FROM distributed_lock "+"WHERE lock_key = ? AND owner_id = ?";try(Connectionconn=dataSource.getConnection();PreparedStatementps=conn.prepareStatement(sql)){ps.setString(1,lockKey);ps.setString(2,ownerId);ps.executeUpdate();}}privateStringgenerateOwnerId(){returnThread.currentThread().getName()+"_"+System.currentTimeMillis()+"_"+Math.random();}}Spring Boot 集成
@ComponentpublicclassDistributedLockService{@AutowiredprivateJdbcTemplatejdbcTemplate;public<T>TexecuteWithLock(StringlockKey,longwaitTime,TimeUnitunit,Supplier<T>supplier){MySQLDistributedLocklock=newMySQLDistributedLock(jdbcTemplate.getDataSource(),lockKey);try{if(lock.tryLock(waitTime,unit)){try{returnsupplier.get();}finally{lock.unlock();}}else{thrownewRuntimeException("获取分布式锁失败: "+lockKey);}}catch(SQLExceptione){thrownewRuntimeException("分布式锁异常",e);}}// 使用示例publicvoidprocessOrder(StringorderId){StringlockKey="order_lock_"+orderId;executeWithLock(lockKey,5,TimeUnit.SECONDS,()->{// 业务逻辑updateOrderStatus(orderId);returnnull;});}}四、进阶优化方案
- 可重入锁实现
-- 支持可重入的锁表结构CREATETABLE`reentrant_lock`(`id`int(11)NOTNULLAUTO_INCREMENT,`lock_key`varchar(64)NOTNULL,`owner_id`varchar(128)NOTNULL,`count`int(11)NOTNULLDEFAULT'0',`expire_time`datetimeDEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY`uk_lock_key`(`lock_key`));- 锁续期机制
publicclassLockRenewalTaskimplementsRunnable{privateMySQLDistributedLocklock;privatevolatilebooleanrunning=true;@Overridepublicvoidrun(){while(running){try{Thread.sleep(10000);// 每10秒续期一次renewLock();}catch(Exceptione){// 处理异常}}}privatevoidrenewLock()throwsSQLException{Stringsql="UPDATE distributed_lock "+"SET expire_time = DATE_ADD(NOW(), INTERVAL 30 SECOND) "+"WHERE lock_key = ? AND owner_id = ?";// 执行更新}}五、各方案对比
方案 优点 缺点 适用场景
基于行锁 实现简单,强一致 性能较低,可能死锁 低并发,强一致性要求
乐观锁 并发性好 需要重试机制 并发冲突少的场景
GET_LOCK() MySQL原生支持 会话级别,连接断开自动释放 简单同步需求
六、最佳实践建议
- 锁粒度控制:尽量缩小锁的范围,使用细粒度锁
- 超时设置:必须设置锁超时时间,防止死锁
- 锁清理:定期清理过期锁记录
- 监控报警:监控锁等待时间和死锁情况
- 降级方案:考虑锁获取失败时的降级策略
七、注意事项
- 数据库性能:高并发场景下,数据库锁可能成为瓶颈
- 死锁风险:注意事务执行顺序,避免死锁
- 连接池配置:确保数据库连接足够
- 时钟同步:多节点服务器时间需要同步
- 错误处理:妥善处理网络异常和数据库异常
对于高性能要求的场景,建议考虑 Redis 或 ZooKeeper 作为分布式锁的实现方案。MySQL 分布式锁更适合于:
· 已有 MySQL 基础设施
· 并发量不高(QPS < 1000)
· 需要强一致性保证
· 系统简单,不希望引入额外中间件