分布式锁分享

分布式锁分享

1.分布式锁的解释

1.1什么是分布式

【Distributed Systems Concepts and Design】(分布式系统概念与设计)一书中分布式解释:

分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间通过消息传来递进行通讯和协调的系统。

分布式的特点:

  • 分布性

    分布式系统中的多台计算机之间在空间位置上可以随意分布,系统中的多台计算机之间没有主、从之分,即没有控制整个系统的主机,也没有受控的从机。

  • 协同性

    系统中的若干台计算机可以互相协作来完成一个共同的任务,或者说一个程序可以分布在几台计算机上并行地运行,提升应用的执行效率。

  • 可扩展性

    企业级应用平台必须要能适应需求的变化,随着并发请求的增加,可以弹性地增加服务器的数量。

  • 通信性

    分布式系统中的任意两台计算机都能通信,进行信息交换。

CAP理论和BASE理论:

CAP :1.一致性(Consistency)、2.可用性(Availablity)、3.分区容忍性(Partition-torlerance)。

BASE: 基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。

1.2分布式锁概念

分布式锁出现背景

随着公司业务增长,应对高并发场景不得不使用分布式集群,为了保证数据的一致性,需要一些技术手段实现包括分布式事务和分布式锁。单机情况下的多线程可以共享堆内存,通过在内存中标记变量实现互斥,我们可以使用Java自带的锁来实现互斥,但是在分布式场景下,多个客户端无法共享内存,为了解决数据一致性问题,需要将标记一个变量也就是锁,放在全局的位置来实现互斥,这样就出现了分布式锁。

分布式锁实现要求

  • 可以保证在分布式集群中,同一时刻只有一个客户端获得锁。

  • 这把锁要是一把可重入锁(避免死锁)

  • 这把锁最好是一把阻塞锁

    阻塞锁让线程进入阻塞状态进行等待,当获得相应的信号(唤醒,时间) 时,可以进入就绪状态,就绪状态中所有线程通过竞争,进入运行状态。

  • 这把锁最好是一把公平锁

    公平锁就是线程按照执行顺序依次获取锁,好处是可以公平的让线程获取锁,避免有线程等待时间过长获取不到锁的情况,但是这种方式也有弊端:额外维护排队逻辑,额外损耗性能。

  • 有高性能、高可用的获取锁和释放锁功能

2.数据库实现分布式锁

实现原理

通过向数据表中添加记录表示加锁,删除表中记录表示释放锁

大体实现

加锁:先select 然后执行 insert or update

释放锁:delete from contentMethodLock where resource_name=XXX and client_info= XXX

img

2.1悲观锁实现

每次在拿取数据的时候都会对数据上锁,这样其他线程想用这个数据就会阻塞,直到获取锁的线程释放锁,例如MySQL的行锁和表锁。

可以使用selectfor update来实现,数据库会在查询过程中给数据库表加上排他锁

添加锁

代码块

SQL

select * from contentMethodLock where client_info ='asdfb12345' and resource_name = 'order_in_stock' for update

InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁

释放锁

代码块

SQL

connection.commit() //这种方式需要显示的去提交事务,不然无法释放锁,导致锁一直占用

2.2 乐观锁实现

总是假设最好的情况,每次去拿数据的时候都认为不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间线程有没有去更新这个数据,可以使用版本号机制实现。

实现思想:先在数据表中添加对应的版本号字段,每当数据库中进行数据更新时,给对应的版本号加1。简而言之,每次进行数据更新时,先要比较版本号,然后判定是否需要进行更新操作。

img

代码块

select version from distributed_lock where resource_name = 'order_in_stock';

update distributed_lock set client_info='asdfb12345',version=version+1

where resource_name = 'order_in_stock' and version=${version};

优劣分析

  • 优点

    • 逻辑简单,便于理解
    • 除了mysql,不用借助外部的组件缓存,zk等
  • 不足

    • 性操作数据库所以性能差,并发高了会有诸多问题
    • 满足不了高可用,有单节点问题,即使通过主-备的形式来缓解,由于主从同步比较慢,但是这样会引来数据不一致的问题。
    • 得自己实现锁的阻塞,重入等。

3.基于缓存的分布式锁

3.1原生Redis实现

实现原理

加锁

(1)判断加锁是否超时,如果没有超时执行setnx(key,expireTime),如果加锁成功则执行业务,如果没有没有加锁成功进入下一步。

(2)get(key)获取过期时间expireTime,判断expireTime是否小于当前时间,如果不小于说明锁未过期,则继续返回加锁流程,如果expireTime小于当前时间说明锁已过期进入下一步。

(3)当锁过期说明 可以加锁了,执行getSet(key, 新expireTime)操作得到oldExpireTime。

(4)此时判断oldExpireTime是否等于expireTime返回值,如果不相等说明有其他客户端已经加锁了,此时继续返回加锁流程。如果oldExpireTime等于expireTime返回值 说明其他客户端没有加锁,则获取到锁

释放锁:

在业务执行完后,直接调用Redis中的del()方法。但是需要注意的是加锁和释放锁需要引入一个客户端标识,否则会有误删其他客户端锁情况。具体情况如下:

(1)进程开始获取锁成功,得到锁lockA

(2)A进程在释放锁的时候,执行del()操作之前,锁lockA正好过期了,进程B得到了锁,并设置成功了lockB

(3)A进程在执行del()操作时,会导致删除锁B

优劣分析

  • 优点

    • 对缓存的操作,速度快,性能好
    • 能满足高并发的需求
  • 不足

    • 实现逻辑较复杂,开发需考虑各种问题
    • 不支持可重入,公平锁,当然可以自己实现
    • 如果一个节点或集群有问题,主从复制数据没完成,导致k-v丢失,造成多个客户端获取锁

公司组件

分布式锁-基于redis的squirrel的实现

Cellar分布式锁实现方案

Cerberus-分布式锁

3.2 Redisson实现

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。总之,Redisson提供了使用Redis的最简单和最便捷的方法,从而让使用者能够将精力更集中地放在处理业务逻辑上。

通过Redssion实现分布式锁的大致方式如下,使用Redisson方式很简单,因为内部已经封装完善的分布式锁的功能,包扩但不仅限于自动续期机制、锁的可重入、公平锁等机制

代码块

Java

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getLock("anyLock");

boolean isLock;

try {

//waitTime 超时时间,leaseTime加锁时间

isLock = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);

if (isLock) {

//TODO 业务逻辑

Thread.sleep(3000);

}

} catch (Exception e) {

} finally {

// 无论如何, 最后都要解锁

redLock.unlock();

}

实现原理

实现流程图(有误修改中)

img

加锁逻辑

(1)执行判断操作,判断lock锁key是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回成功告诉客户端直接获取到锁,注意是一段lua脚本是原子操作。

为什么lua脚本 + redis命令是原子操作:因为lua脚本连续执行,再加上redis是单线程,所以lua脚本里执行redis命令具有排他性。

img

代码块

Java

//lock的hash存储结构 guid + 当前线程的ID。后面的value是就和可重入加锁次数

myLock:{

"8743c9c0-0795-4907-87fd-6c719a6b4586:1": 1

}

(2)如果lock锁key存在,获取lock锁key的hash数据结构,判断guid + 当前客户端线程的ID 是否和当前加锁线程能对应上,如果能对应上则将重入次数加1,并重新设置过期时间,返回成功,告诉客户端直接获取到锁。

(3)如果 myLock数据结构和当前线程信息对应不上,客户端2线程会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。此时客户端2线程会进入一个while循环,不停的尝试加锁。

解锁逻辑

(1)由于支持可重入,在解锁时将重入次数需要减1,如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:"del myLock"命令,从redis里删除这个key,解锁成功。

(2)如果锁不是被当前线程锁定或者lock锁key不存在,则返回成功。

watch dog自动延期机制

客户端1加锁的锁key默认生存时间才10秒,如果超过了10秒,客户端的业务逻辑还没执行完,怎么办呢?那就用到watch dog自动延期机制

获取锁成功就会开启一个定时任务,也就是watchdog,定时任务会定期检查去续期renewExpirationAsync(threadId).通过源码分析我们知道,默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,那么到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒。

03 - squirrel下watchdog机制实现

读写锁

代码块

JSON

//读锁结构

anyLock: {

"mode": "read",

"UUID_01:threadId_01": 2,

"UUID_02:threadId_02": 1

}

//写锁结构

anyLock: {

"mode": "write",

"UUID_01:threadId_01:write": 2

}

优劣分析

  • 优点

    • 实现逻辑简单,开发量小
    • 支持锁的种类较多,包括可重入锁、公平锁、 红锁、读写锁和联锁。
  • 不足

    • Redisson底层依赖Redis集群,公司现有的Squirrel(依赖Jedis的api),而Redisson自己实现操作redis底层API,无法集成到Squirrel。(但是没关系Squirrel的也有这些锁的实现,原理基本相同)
    • 在Redis master宕机的时候,可能导致多个客户端同时完成加锁

01 - Squirrel引擎下的multiLock实现

3.3 redlock

实现原理

(1)Client使用相同的key和随机数,按照顺序在每个Master实例中尝试获得锁。在获得锁的过程中,为每一个锁操作设置一个快速失败时间(如果想要获得一个10秒的锁, 那么每一个锁操作的失败时间设为5-50ms)。

(2)客户端计算出与master获得锁操作过程中消耗的时间,当且仅当Client获得锁消耗的时间小于锁的存活时间,并且在N/2+1以上的master节点中获得锁。才认为client成功的获得了锁。

(3)如果Client获得锁的数量不足一半以上,或获得锁的时间超时,那么认为获得锁失败。客户端需要尝试在所有的master节点中释放锁, 即使在第二步中没有成功获得该Master节点中的锁,仍要进行释放操作。

img

优劣分析

  • 优点

    • 完美解决Redis master宕机的时候,可能导致多个客户端同时完成加锁
    • 可用性高,即使有master节点挂了也完全可用
  • 不足

    • Redisson底层依赖Redis集群,公司现有的Squirrel(依赖Jedis的api),而Redisson自己实现操作redis底层API,无法集成到Squirrel。
    • redlock算法对时钟依赖性太强,若N个节点中的发生时间跳跃,导致过期,超时时间计算的问题。

思考:jvm fullgc STW会导致线程执行暂停,有client 1发生STW ,停顿的时间超过了锁的超时时间,然后client 2 获取了锁,此时client 1停顿结束了,还是会接着执行完锁住的那段代码,这样就出现了2个client同时获取了锁。怎么解决这个问题?

4.基于zookeeper的分布式锁

ZooKeeper 是以 Paxos 算法为基础的分布式应用程序协调服务。ZK 的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。

4.1 原生Zookeeper

实现原理

实现流程

(1)客户端调用create()方法创建名为“Lock/node”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL(临时排序节点)。

(2)客户端调用getChildren(“Lock”)方法来获取所有已经创建的子节点。

(3)客户端获取到所有子节点路径之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。

(4)如果创建的节点不是所有节点中需要最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。

流程图

优劣分析

  • 优点

    • 锁模型健壮,可靠性高,客户端宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁
  • 不足

    • 效率较低,性能较差,通过频繁的创建节点和删除节点来实现加锁和解锁
    • 需要自己手动实现整个加锁和解锁操作,这样要考虑到各种细节,异常处理等。

4.2基于开源Curator客户端

Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套API 简化了ZooKeeper的操作,解决了很多Zookeeper客户端非常底层的细节开发工作。

实现原理和使用

实现逻辑和原生Zookeeper保持一致,如下第三方库Curator客户端具体实现流程图

可重入锁的实现原理

每个InterProcessMutex实例,都会持有一个ConcurrentMap类型的threadData对象,以线程对象作为Key,以LockData作为Value值。通过判断当前线程threadData是否有值,如果有,则表示线程可以重入该锁,于是将lockData的lockCount进行累加;如果没有,则进行锁的抢夺。

internals.attemptLock方法返回lockPath!=null时,表明了该线程已经成功持有了这把锁,于是乎LockData对象被new了出来,并存放到threadData中。

img

优劣分析

  • 优势

    • 支持锁的种类较多,包括可重入锁,不可重入锁,公平锁, 读写锁,可根据具体业务场景配套使用
  • 不足

    • 底层依赖Zookeeper集群,需要结合公司现有的Zookeeper集群,目前Lion实现就是通过Curator客户端操作Zookeeper

4.3基于公司内部Lion

Lion提供的分布式锁完全基于ZK,并且支持重入锁和读写锁,具体使用 实现见:Lion分布式锁

Lion提供的分布式锁完全基于ZK,ZK的分布式锁不适用于高QPS是,锁的量很大的场景。

如果你的锁QPS超过100或者锁的数量超过50万,不要使用基于ZK的分布式锁,应寻求基于Redis的分布式锁方案。

摘自Lion 使用文档

优劣分析

  • 优势

    • 结合公司现有的能力,快速实现分布式锁
    • 高可用得到保障,有对应的集群
  • 不足

    • 受zookeeper性能约束,不适合高并发场景

5.基于Cerberus

Cerberus是到店部门研发的分布式锁组件,目前提供了三种引擎供大家使用,Tair、zk和Squirrel。如果有需要,除主引擎外,也可配置另外1-2种副引擎,以便必要时进行降级处理。

01 - 接入指南 Cerberus-分布式锁

实现原理

整体架构

img

现状

ZooKeeper Cellar Squirrel
实现方式 基本使用Menagerie源码,有细微改动 利用Cellar的expirelock,仿照Menagerie和Concurrent包开发 基于redis集群
功能 可自主选择引擎 实现 实现 实现
可重入锁 lock() 实现 实现 实现
tryLock() 实现 实现 实现
lockInterruptibly() 实现 实现 实现
unlock() 实现 实现 实现
公平/非公平锁 公平锁 非公平锁 非公平锁
可重入读写锁 实现 未实现 未实现
Condition 实现 未实现 未实现
MultiLock 未实现 未实现 实现
WatchDog续租 未实现 未实现 实现
引擎故障检查 未实现 未实现 未实现
切换引擎 可通过接口手动切换引擎(切换引擎会导致锁丢失)
用户可自定义集群环境 实现 实现 实现
羊群效应
集群 线上 已提供host=cerberus-zk.vip.sankuai.combaseLockPath=/dlm-locks 已提供remote_appkey=com.sankuai.tair.hotel.plat.publicarea=22 已提供clusterName=redis-hotel-cerberus_productcategory=travel_cerberus
线下 已提供host=mobile-zk.test.sankuai.com:2181baseLockPath=/dlm-locks 已提供remote_appkey=com.sankuai.tair.qa.functionarea=74 已提供clusterName=redis-hotel-cerberus_devcategory=travel_cerberus

测试报告:

cerberus综合测试

降级方法

当前使用的集群出现故障时,服务的负责人可在octo的mcc配置页进行手动切换。

将Key:cerberus_engine的值改为tair,点击保存。当前服务的引擎则会切换到tair。

如果改为其他值或者留空,则会按照Cerberus配置的引擎顺序,切换到下一个引擎

切换引擎会导致锁丢失

优劣分析

  • 优势

    • 接入简单
    • 轻量级,java包形式
    • 支持多种底层引擎,根据业务需求来切换
    • 支持分布式锁降级处理
  • 不足

    • 劣势就是各引擎的劣势

6.总结

对比

数据库 缓存 Zookeeper
理解角度 ✨✨✨ ✨✨✨✨ ✨✨✨✨
实现角度 ✨✨✨ ✨✨✨✨✨ ✨✨✨✨
性能角度 ✨✨✨ ✨✨✨✨✨ ✨✨✨✨
安全角度 ✨✨✨ ✨✨✨✨ ✨✨✨✨✨

方案倾向

推荐使用公司的Cerberus作为分布式锁实现

资料参考

01 - Squirrel引擎下的multiLock实现

03 - squirrel下watchdog机制实现

Lion 使用文档

Cerberus-分布式锁

01 - 接入指南

Cerberus-分布式锁

https://km.sankuai.com/page/28252571#id-2.7.分布式锁

http://zhangtielei.com/posts/blog-redlock-reasoning.html

http://zhangtielei.com/posts/blog-redlock-reasoning-part2.html

https://diaozxin007.github.io/2017/10/29/Redis-RedLock-完美的分布式锁么?

基于分布式锁实现秒杀

最近在项目中遇到了类似“秒杀”的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。

业务场景

所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。

一些可能的实现

刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法: 1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法; 2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分; 3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。

上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。

那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。

何为分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。 通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。 而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。

具体的实现

先来看看一些redis的基本命令: SETNX key value 如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是”SET if Not eXists”。 expire KEY seconds 设置key的过期时间。如果key已过期,将会被自动删除。 del KEY 删除key 由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考redis官网。

需要考虑的问题

1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。 2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。 3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。 4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。 5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。

talk is cheap,show me the code

在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。

几个注解定义: cachelock是方法级的注解,用于注解会产生并发问题的方法:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
    String lockedPrefix() default "";//redis 锁key的前缀
    long timeOut() default 2000;//轮询锁的时间
    int expireTime() default 1000;//key在redis里存在的时间,1000S
}

lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
    //不需要值
}

LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
    String field() default "";//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID

}

CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:

public class CacheLockInterceptor implements InvocationHandler{
    public static int ERROR_COUNT  = 0;
    private Object proxied;
public CacheLockInterceptor(Object proxied) {
    this.proxied = proxied;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    CacheLock cacheLock = method.getAnnotation(CacheLock.class);
    //没有cacheLock注解,pass
    if(null == cacheLock){
        System.out.println("no cacheLock annotation");          
        return method.invoke(proxied, args);
    }
    //获得方法中参数的注解
    Annotation[][] annotations = method.getParameterAnnotations();
    //根据获取到的参数注解和参数列表获得加锁的参数
    Object lockedObject = getLockedObject(annotations,args);
    String objectValue = lockedObject.toString();
    //新建一个锁
    RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
    //加锁
    boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
    if(!result){//取锁失败
        ERROR_COUNT += 1;
        throw new CacheLockException("get lock fail");

    }
    try{
        //加锁成功,执行方法
        return method.invoke(proxied, args);
    }finally{
        lock.unlock();//释放锁
    }

}
/**
 * 
 * @param annotations
 * @param args
 * @return
 * @throws CacheLockException
 */
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
    if(null == args || args.length == 0){
        throw new CacheLockException("方法参数为空,没有被锁定的对象");
    }

    if(null == annotations || annotations.length == 0){
        throw new CacheLockException("没有被注解的参数");
    }
    //不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
    int index = -1;//标记参数的位置指针
    for(int i = 0;i < annotations.length;i++){
        for(int j = 0;j < annotations[i].length;j++){
            if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
                index = i;
                try {
                    return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
                } catch (NoSuchFieldException | SecurityException e) {
                    throw new CacheLockException("注解对象中没有该属性" + ((LockedComplexObject)annotations[i][j]).field());
                }
            }

            if(annotations[i][j] instanceof LockedObject){
                index = i;
                break;
            }
        }
        //找到第一个后直接break,不支持多参数加锁
        if(index != -1){
            break;
        }
    }

    if(index == -1){
        throw new CacheLockException("请指定被锁定参数");
    }

    return args[index];
}
 }

最关键的RedisLock类中的lock方法和unlock方法:

 * ```java
      */**

       * 加锁
       * 使用方式为:
       * lock();
       * try{
       * executeMethod();
       * }finally{
       * unlock();
       * }
       * @param timeout timeout的时间范围内轮询锁
       * @param expire 设置锁超时时间
       * @return 成功 or 失败
         /
             public boolean lock(long timeout,int expire){
         long nanoTime = System.nanoTime();
         timeout *= MILLI_NANO_TIME;
         try {
             //在timeout的时间范围内不断轮询锁
             while (System.nanoTime() - nanoTime < timeout) {
                 //锁不存在的话,设置锁并设置锁过期时间,即加锁
                 if (this.redisClient.setnx(this.key, LOCKED) == 1) {
                 this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
                 //锁的情况下锁过期后消失,不会造成永久阻塞
                 this.lock = true;
                 return this.lock;
             }
             System.out.println("出现锁等待");
             //短暂休眠,避免可能的活锁
             Thread.sleep(3, RANDOM.nextInt(30));
         } 
     } catch (Exception e) {
         throw new RuntimeException("locking error",e);
     }
     return false;
         }

 public  void unlock() {
    try {
        if(this.lock){
            redisClient.delKey(key);//直接删除
        }
    } catch (Throwable e) {

    }
}
```


​ 上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。 ​ 先定义一个接口,接口里定义了一个秒杀方法:

public interface SeckillInterface {
/**
*现在暂时只支持在接口方法上注解
*/
    //cacheLock注解可能产生并发的方法
    @CacheLock(lockedPrefix="TEST_PREFIX")
    public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
}

上述SeckillInterface接口的实现类,即秒杀的具体实现:

public class SecKillImpl implements SeckillInterface{
    static Map<Long, Long> inventory ;
    static{
        inventory = new HashMap<>();
        inventory.put(10000001L, 10000l);
        inventory.put(10000002L, 10000l);
    }
@Override
public void secKill(String arg1, Long arg2) {
    //最简单的秒杀,这里仅作为demo示例
    reduceInventory(arg2);
}
//模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
public Long reduceInventory(Long commodityId){
    inventory.put(commodityId,inventory.get(commodityId) - 1);
    return inventory.get(commodityId);
}
}

模拟秒杀场景,1000个线程来争抢两个商品:

    @Test
    public void testSecKill(){
        int threadCount = 1000;
        int splitPoint = 500;
        CountDownLatch endCount = new CountDownLatch(threadCount);
        CountDownLatch beginCount = new CountDownLatch(1);
        SecKillImpl testClass = new SecKillImpl();
    Thread[] threads = new Thread[threadCount];
    //起500个线程,秒杀第一个商品
    for(int i= 0;i < splitPoint;i++){
        threads[i] = new Thread(new  Runnable() {
            public void run() {
                try {
                    //等待在一个信号量上,挂起
                    beginCount.await();
                    //用动态代理的方式调用secKill方法
                    SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
                        new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
                    proxy.secKill("test", commidityId1);
                    endCount.countDown();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        threads[i].start();

    }
    //再起500个线程,秒杀第二件商品
    for(int i= splitPoint;i < threadCount;i++){
        threads[i] = new Thread(new  Runnable() {
            public void run() {
                try {
                    //等待在一个信号量上,挂起
                    beginCount.await();
                    //用动态代理的方式调用secKill方法
                    SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(), 
                        new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
                    proxy.secKill("test", commidityId2);
                    //testClass.testFunc("test", 10000001L);
                    endCount.countDown();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        threads[i].start();

    }

  long startTime = System.currentTimeMillis();
    //主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
    beginCount.countDown();

    try {
        //主线程等待结束信号量
        endCount.await();
        //观察秒杀结果是否正确
        System.out.println(SecKillImpl.inventory.get(commidityId1));
        System.out.println(SecKillImpl.inventory.get(commidityId2));
        System.out.println("error count" + CacheLockInterceptor.ERROR_COUNT);
        System.out.println("total cost " + (System.currentTimeMillis() - startTime));
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。 这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。 通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。

小结

这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。 ———————————————— 版权声明:本文为CSDN博主「lsfire」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/u010359884/article/details/50310387


三种实现分布式锁的方式

一、为什么要使用分布式锁

我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的Java多线程的18般武艺进行处理,并且可以完美的运行,毫无Bug!

注意这是单机应用,也就是所有的请求都会分配到当前服务器的JVM内部,然后映射为操作系统的线程进行处理!而这个共享变量只是在这个JVM内部的一块内存空间!

后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:

这里写图片描述

上图可以看到,变量A存在JVM1、JVM2、JVM3三个JVM内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象,例如:UserController控制器中的一个整形类型的成员变量),如果不加任何控制的话,变量A同时都会在JVM分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同JVM内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!

如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!

为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

二、分布式锁应该具备哪些条件

在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行; 2、高可用的获取锁与释放锁; 3、高性能的获取锁与释放锁; 4、具备可重入特性; 5、具备锁失效机制,防止死锁; 6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

三、分布式锁的三种实现方式

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

基于数据库实现分布式锁; 基于缓存(Redis等)实现分布式锁; 基于Zookeeper实现分布式锁;

1.基于数据库实现排他锁

方案1

表结构

img

获取锁

INSERT INTO method_lock (method_name, desc) VALUES ('methodName', 'methodName');

对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功。

方案2

表结构

DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
  `state` tinyint NOT NULL COMMENT '1:未分配;2:已分配',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `version` int NOT NULL COMMENT '版本号',
  `PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

先获取锁的信息

select id, method_name, state,version from method_lock where state=1 and method_name='methodName';

占有锁

update t_resoure set state=2, version=2, update_time=now() where method_name='methodName' and state=1 and version=2;

如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。

缺点:

1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

解决方案:

1、数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。 2、没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。 3、非阻塞的?搞一个while循环,直到insert成功再返回成功。 4、非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

2.基于redis实现

获取锁使用命令:

SET resource_name my_random_value NX PX 30000

方案:

try{
    lock = redisTemplate.opsForValue().setIfAbsent(lockKey, LOCK);
    logger.info("cancelCouponCode是否获取到锁:"+lock);
    if (lock) {
        // TODO
        redisTemplate.expire(lockKey,1, TimeUnit.MINUTES); //成功设置过期时间
        return res;
    }else {
        logger.info("cancelCouponCode没有获取到锁,不执行任务!");
    }
}finally{
    if(lock){    
        redisTemplate.delete(lockKey);
        logger.info("cancelCouponCode任务结束,释放锁!");        
    }else{
        logger.info("cancelCouponCode没有获取到锁,无需释放锁!");
    }
}

缺点:

在这种场景(主从结构)中存在明显的竞态: 客户端A从master获取到锁, 在master将锁同步到slave之前,master宕掉了。 slave节点被晋级为master节点, 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!

3.基于zookeeper实现

让我们来回顾一下Zookeeper节点的概念:

「每日分享」如何用Zookeeper实现分布式锁

Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode。

Znode分为四种类型:

1.持久节点 (PERSISTENT)

默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。

2.持久节点顺序节点(PERSISTENT_SEQUENTIAL)

所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号:

「每日分享」如何用Zookeeper实现分布式锁

3.临时节点(EPHEMERAL)

和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除:

「每日分享」如何用Zookeeper实现分布式锁

「每日分享」如何用Zookeeper实现分布式锁

「每日分享」如何用Zookeeper实现分布式锁

4.临时顺序节点(EPHEMERAL_SEQUENTIAL)

顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

Zookeeper分布式锁的原理

Zookeeper分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:

「每日分享」如何用Zookeeper实现分布式锁

获取锁

首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。

之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

「每日分享」如何用Zookeeper实现分布式锁

这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。

「每日分享」如何用Zookeeper实现分布式锁

Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。

于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。

「每日分享」如何用Zookeeper实现分布式锁

这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。

「每日分享」如何用Zookeeper实现分布式锁

Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。

于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。

「每日分享」如何用Zookeeper实现分布式锁

这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的

释放锁

释放锁分为两种情况:

1.任务完成,客户端显示释放

当任务完成时,Client1会显示调用删除节点Lock1的指令。

「每日分享」如何用Zookeeper实现分布式锁

2.任务执行过程中,客户端崩溃

获得锁的Client1在任务执行过程中,如果Duang的一声崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。

「每日分享」如何用Zookeeper实现分布式锁

由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。

「每日分享」如何用Zookeeper实现分布式锁

同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知。

「每日分享」如何用Zookeeper实现分布式锁

最终,Client3成功得到了锁。

方案:

可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

img

Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。

https://github.com/apache/curator/

缺点:

  • 性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)

4.总结

下面的表格总结了Zookeeper和Redis分布式锁的优缺点:

img

三种方案的比较

上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。

从理解的难易程度角度(从低到高)

数据库 > 缓存 > Zookeeper

从实现的复杂性角度(从低到高)

Zookeeper >= 缓存 > 数据库

从性能角度(从高到低)

缓存 > Zookeeper >= 数据库

从可靠性角度(从高到低)

Zookeeper > 缓存 > 数据库

Refrence: https://www.cnblogs.com/austinspark-jessylu/p/8043726.html https://www.toutiao.com/a6558681932786303501/?tt_from=weixin&utm_campaign=client_share×tamp=1528800534&app=news_article&utm_source=weixin&iid=34667892860&utm_medium=toutiao_ios&wxshare_count=1

———————————————— 版权声明:本文为CSDN博主「夏目 "」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/wuzhiwei549/article/details/80692278

results matching ""

    No results matching ""