J.U.C剖析与解读1(Lock的实现) (4)

到这这里,就不禁会有一个疑问。如何判断尝试获取锁的线程想要获得的锁是什么类型的锁。在API调用阶段,我们可以根据API判断。但是放入等待队列后,我们如何判断呢?如果还是如之前那样,等待队列只是保存竞争锁的线程对象,是完全不够的。

所以我们需要新建一个WaitNode的Class,用来保存等待队列中线程对象及相关必要信息。所以,WaitNode会有如下属性:

Thread thread:标识该等待者的线程。

int type:标识该线程对象希望竞争的锁的类型。0表示写锁(独占锁),1表示读锁(共享锁)。

int arg:扩展参数。其实在手写的简易版,看不出来价值。但是实际AQS中的Node就是类似设计。不过AQS中,并不是采用queue保存Node,而是通过一个链表的方式保存Node。

类方法方面:

独占锁:

tryLock:与JarryReentrantLock类似,不过增加了两点。一方面需要考量共享锁是否被占用。另一方面需要引入acquire参数(目前是固定值),呼应WaitNode的arg。

lock:与JarryReentrantLock类似,不过需要手动设置arg。

tryUnlock:与JarryReentrantLock类似,同样需要引入release参数(目前是固定值),呼应WaitNode的arg。

unlock:与JarryReentrantLock类似,不过需要手动设置arg。

共享锁:

tryLockShared:尝试获取共享锁,成功返回true,失败返回false。其实和独占锁的tryLock类似,只不过需要额外考虑独占锁是否已经存在。另外为了实现锁降级,如果独占锁存在,需要判断独占锁的持有者与当前尝试获得共享锁的线程是否一致。

lockShared:获取共享锁,直到成功。由于已经有了WaitNode.type,用于判断锁类型,所以共享锁与独占锁使用的是同一队列。同样的,这里需要手动设置arg。其它方面与独占锁的lock操作基本一致。

tryUnlockShared:尝试释放锁,成功返回true,失败返回false。类似于tryUnlock,只不过增加了release参数(固定值),呼应WaitNode的arg。

unlockShared:释放锁。类似unlock,不过需要手动设置arg。

JarryReentrantLock实现 package tech.jarry.learning.netease; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock { // 用于读锁(共享锁)的锁计数器 这里真的有必要volatile嘛(Atomic中的value时volatile的),再看看后续代码 // 这里确实不需要volatile,至于源码,更过分,源码是通过一个变量state的位运算实现readCount与writeCount volatile AtomicInteger readCount = new AtomicInteger(0); // 用于写锁(独占锁)的锁计数器 这里之所以不用volatile是因为独占锁,只有一个线程在改变writeCount(即使有缓存,也还是这个线程,所以不会因为缓存问题,导致问题) AtomicInteger writeCount = new AtomicInteger(0); // 用于保存锁的持有者(这里专指写锁(独占锁)的锁持有者) AtomicReference<Thread> owner = new AtomicReference<>(); // 用于保存期望获得锁的线程(为了区分线程希望获得的锁的类型,这里新建一个新的数据类型(通过内部类实现)) public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>(); // 内部类实现等待队列中的自定义数据类型 class WaitNode{ // 表示该等待者的线程 Thread thread = null; // 表示希望争取的锁的类型。0表示写锁(独占锁),1表示读锁(共享锁) int type = 0; // 参数,acquire,状态相关,再看看 int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.type = type; this.thread = thread; this.arg = arg; } } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。一般传入waitNode.arg(本代码中就是1。为什么不用一个常量1,就不知道了?)(可以更好的对接AQS) * @return */ public boolean tryLock(int acquires){ //TODO_FINISHED 这里readCount的判断,与修改writeCount的操作可以被割裂,并不是原子性的。不就有可能出现readCount与writeCount的值同时大于零的情况。 // 该示例代码,确实存在该问题,但实际源码,writeCount与readCount是通过同一变量state实现的,所以可以很好地通过CAS确保原子性 // readCount表示读锁(共享锁)的上锁次数 if (readCount.get() == 0){ // readCount的值为0,表示读锁(共享锁)空置,所以当前线程是有可能获得写锁(独占锁)。 // 接下来判断写锁(独占锁)是否被占用 int writeCountValue = writeCount.get(); if (writeCountValue == 0){ // 写锁(独占锁)的锁次数为0,表示写锁(独占锁)并没未被任何线程持有 if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ // 修改writeCount,来获得锁。该机制与ReentrantLock相同 // 设置独享锁的持有者owner owner.set(Thread.currentThread()); // 至此,表示当前线程抢锁成功 return true; } } else { // 写锁(独占锁)的锁次数不为0,表示写锁(独占锁)已经被某线程持有 if (Thread.currentThread() == owner.get()){ // 如果持有锁的线程为当前线程,那就进行锁的重入操作 writeCount.set(writeCountValue+acquires); // 重入锁,表示当前线程是持有锁的 return true; } // 读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程 } } // 读锁被占据 // 其它情况(1.读锁被占据,2读锁未被占用,但写锁被占用,且占据写锁的线程不是当前线程),都返回false return false; } /** * 获取独占锁(针对独占锁) */ public void lock(){ // 设定waitNote中arg参数 int arg = 1; // 尝试获取独占锁。成功便退出方法,失败,则进入“不死不休”逻辑 if (!tryLock(arg)){ // 需要将当前保存至等待队列,在这之前,需要封装当前线程为waitNote WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); // 将封装好的waitNode放入等待队列waiters中(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 如ReentrantLock一般,开始循环尝试拿锁 while (true){ // 获取队列头部元素 WaitNode headNote = waiters.peek(); // 如果等待队列头部元素headNote不为null(有可能是null嘛?),并且就是当前线程,那就尝试获取锁 if (headNote !=null && headNote.thread == Thread.currentThread()){ // 如果再次尝试获取锁失败,那就只能挂起了 if (!tryLock(headNote.arg)){ LockSupport.park(); } else { // 再次尝试获取锁成功,那就将队列头部元素,踢出等待队列waiters waiters.poll(); return; } }else { // 如果headNote不是当前线程的封装,就直接挂起(这里就没处理headNote==null的情况) LockSupport.park(); } } } } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ // 首先判断锁的持有者是否为当前线程 if (owner.get() != Thread.currentThread()){ // 锁的持有者不是当前线程(即使锁的持有者为null,锁的持有者是null,还解锁,仍然是抛出异常) throw new IllegalMonitorStateException(); } // 锁的持有者就是当前线程 // 首先按照releases进行解锁(经过一番思考后,这里不会出现类似DoubleCheck中的问题(Atomic中的value是volatile的),所以这个值同时只会有一个线程对其操作) int writeCountValue = writeCount.get(); // 为writeCount设置新值 writeCount.set(writeCountValue-releases); // 根据writeCount的新值,判断锁的持有者是否发生变化 if (writeCount.get() == 0){ // writeCount的值为0,表示当前线程已经完全解锁,所以修改锁的持有者为null owner.set(null); // 而这表示完全解锁成功 return true; } else { // writeCount的值不为0,表示当前线程尚未完全解锁,故锁的持有者未发生变化。即尝试解锁失败 return false; } } /** * 解锁(针对独占锁) */ public void unlock(){ // 设定tryUnlock的参数releases int arg = 1; // 先尝试解锁 if (tryUnlock(arg)){ // 获得等待队列的头部元素 WaitNode head = waiters.peek(); // 检测一下头部元素head是否null(也许等待队列根本就没有元素) if (head == null){ // 如果头部元素head为null,说明队列为null,直接return return; } // 解锁成功,就要把等待队列中的头部元素唤醒(unpark) // 这里有一点注意,即使队列的头元素head被唤醒了,也不一定就是这个头元素head获得锁(详见tryLock,新来的线程可能获得锁) // 如果这个头元素无法获得锁,就会park(while循环嘛)。并且一次park,可以多次unpark(已实践) LockSupport.unpark(head.thread); } } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ // 判断写锁(独占锁)是否被别的线程持有(这个条件意味着:同一个线程可以同时持有读锁与写锁) // 该方法是为了进行 锁降级****** if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){ // 如果写锁(独占锁)没有别的被线程持有,就可以继续尝试获取读锁(共享锁) // 通过循环实现自旋,从而实现加锁(避免加锁失败) while(true){ // 由于读锁(共享锁)是共享的,不存在独占行为,故直接在writeCount增加当前线程加锁行为的次数acquires int writeCountValue = writeCount.get(); // 通过CAS进行共享锁的次数的增加 if (writeCount.compareAndSet(writeCountValue, writeCountValue+acquires)){ break; } } } // 写锁已经被别的线程持有,共享锁获取失败 return false; } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ // 设定waitNote中arg参数 int arg = 1; // 判断是否获取共享锁成功 if (!tryLockShared(arg)){ // 如果获取共享锁失败,就进入等待队列 // 与获取同步锁操作一样的,需要先对当前线程进行WaitNote的封装 WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg); // 将waitNote置入waiters(offer方法会在队列满时,直接返回false。put则是阻塞。add则是抛出异常) waiters.offer(waitNode); // 使用循环。一方面避免伪唤醒,另一方面便于二次尝试获取锁 while (true){ // 获取等待队列waiters的头元素head WaitNode head = waiters.peek(); // 校验head是否为null,并判断等待队列的头元素head是否为当前线程的封装(也许head时当前线程的封装,但并不意味着head就是刚刚放入waiters的元素) if (head != null && head.thread == Thread.currentThread()){ // 如果校验通过,并且等待队列的头元素head为当前线程的封装,就再次尝试获取锁 if (tryLockShared(head.arg)){ // 获取共享锁成功,就从当前队列中移除head元素(poll()方法移除队列头部元素) waiters.poll(); // 在此处就是与独占锁不同的地方了,独占锁意味着只可能有一个线程获得锁,而共享锁是可以有多个线程获得的 // 获得等待队列的新头元素newHead WaitNode newHead = waiters.peek(); // 校验该元素是否为null,并判断它的锁类型是否为共享锁 if (newHead != null && newHead.type == 1){ // 如果等待队列的新头元素是争取共享锁的,那么就唤醒它(这是一个类似迭代的过程,刚唤醒的线程会会做出同样的举动) //TODO_FINISHED 这里有一点,我有些疑惑,那么如果等待队列是这样的{共享锁,共享锁,独占锁,共享锁,共享锁},共享锁们被一个独占锁隔开了。是不是就不能唤醒后面的共享锁了。再看看后面的代码 // 这个实际源码,并不是这样的。老师表示现有代码是这样的,不用理解那么深入,后续有机会看看源码 LockSupport.unpark(newHead.thread); } } else { // 如果再次获取共享锁失败,就挂起 LockSupport.park(); } } else { // 如果校验未通过,或等待队列的头元素head不是当前线程的封装,就挂起当前线程 LockSupport.park(); } } } } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnlockShared(int releases){ // 通过CAS操作,减少共享锁的锁次数,即readCount的值(由于是共享锁,所以是可能多个线程同时减少该值的,故采用CAS) while (true){ // 获取读锁(共享锁)的值 int readCountValue = readCount.get(); int readCountNext = readCountValue - releases; // 只有成功修改值,才可以跳出 if (readCount.compareAndSet(readCountValue,readCountNext)){ // 用于表明共享锁完全解锁成功 return readCountNext == 0; } } // 由于读锁没有owner,所以不用进行有关owner的操作 } /** * 解锁(针对共享锁) */ public boolean unlockShared(){ // 设定tryUnlockShared的参数releases int arg = 1; // 判断是否尝试解锁成功 if (tryUnlockShared(arg)){ // 如果尝试解锁成功,就需要唤醒等待队列的头元素head的线程 WaitNode head = waiters.peek(); // 校验head是否为null,毕竟可能等待队列为null if (head != null){ // 唤醒等待队列的头元素head的线程 LockSupport.unpark(head.thread); } //TODO_FINISHED 尝试共享锁解锁成功后,就应当返回true(虽然有些不大理解作用) // 用于对应源码 return true; } //TODO_FINISHED 尝试共享锁解锁失败后,就应当返回false(虽然有些不大理解作用) // 用于对应源码 return false; } }

这里同样不进行相关解释了。因为需要的解释,在注释中都写的很明确了,包括我踩的一些坑。

如果依旧有一些看不懂的地方,或者错误的地方,欢迎@我,或者私信我。

四,总结 技术

CAS:通过CAS实现锁持有数量等的原子性操作,从而完成锁的竞争操作。

Atomic:为了简化操作(避免自己获取Unsafe,offset等),通过Atomic实现CAS 操作。

volatile:为了避免多线程下的可见性问题,采用了volatile的no cache特性。

transient:可以避免对应变量序列化,源码中有采用。不过考虑后,并没有使用。

while:一方面通过while避免伪唤醒问题,另一方面,通过while推动流程(这个需要看代码)。

LinkedBlockingQueue:实现线程等待队列。实际的AQS是通过Node构成链表结构的。

LockSupport:通过LockSupport实现线程的挂起,唤醒等操作。

IllegalMonitorStateException:就是一个异常类型,仿Synchronized的,起码看起来更明确,还不用自己实现新的Exception类型。

方案

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyxdzg.html