死磕 java同步系列之ReentrantReadWriteLock源码解析

(1)读写锁是什么?

(2)读写锁具有哪些特性?

(3)ReentrantReadWriteLock是怎么实现读写锁的?

(4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap?

简介

读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量。

特性

读写锁具有以下特性:

是否互斥 读 写
     
     

可以看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。

那么,ReentrantReadWriteLock是怎么实现读写锁的呢?

类结构

在看源码之前,我们还是先来看一下ReentrantReadWriteLock这个类的主要结构。

ReentrantReadWriteLock

ReentrantReadWriteLock中的类分成三个部分:

(1)ReentrantReadWriteLock本身实现了ReadWriteLock接口,这个接口只提供了两个方法readLock()和writeLock();

(2)同步器,包含一个继承了AQS的Sync内部类,以及其两个子类FairSync和NonfairSync;

(3)ReadLock和WriteLock两个内部类实现了Lock接口,它们具有锁的一些特性。

源码分析 主要属性 // 读锁 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync;

维护了读锁、写锁和同步器。

主要构造方法 // 默认构造方法 public ReentrantReadWriteLock() { this(false); } // 是否使用公平锁的构造方法 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }

它提供了两个构造方法,默认构造方法使用的是非公平锁模式,在构造方法中初始化了读锁和写锁。

获取读锁和写锁的方法 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

属性中的读锁和写锁是私有属性,通过这两个方法暴露出去。

下面我们主要分析读锁和写锁的加锁、解锁方法,且都是基于非公平模式的。

ReadLock.lock() // ReentrantReadWriteLock.ReadLock.lock() public void lock() { sync.acquireShared(1); } // AbstractQueuedSynchronizer.acquireShared() public final void acquireShared(int arg) { // 尝试获取共享锁(返回1表示成功,返回-1表示失败) if (tryAcquireShared(arg) < 0) // 失败了就可能要排队 doAcquireShared(arg); } // ReentrantReadWriteLock.Sync.tryAcquireShared() protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 状态变量的值 // 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数 int c = getState(); // 互斥锁的次数 // 如果其它线程获得了写锁,直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁被获取的次数 int r = sharedCount(c); // 下面说明此时还没有写锁,尝试去更新state的值获取读锁 // 读者是否需要排队(是否是公平模式) if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 获取读锁成功 if (r == 0) { // 如果之前还没有线程获取读锁 // 记录第一个读者为当前线程 firstReader = current; // 第一个读者重入的次数为1 firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果有线程获取了读锁且是当前线程是第一个读者 // 则把其重入次数加1 firstReaderHoldCount++; } else { // 如果有线程获取了读锁且当前线程不是第一个读者 // 则从缓存中获取重入次数保存器 HoldCounter rh = cachedHoldCounter; // 如果缓存不属性当前线程 // 再从ThreadLocal中获取 // readHolds本身是一个ThreadLocal,里面存储的是HoldCounter if (rh == null || rh.tid != getThreadId(current)) // get()的时候会初始化rh cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 如果rh的次数为0,把它放到ThreadLocal中去 readHolds.set(rh); // 重入的次数加1(初始次数为0) rh.count++; } // 获取读锁成功,返回1 return 1; } // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败) return fullTryAcquireShared(current); } // AbstractQueuedSynchronizer.doAcquireShared() private void doAcquireShared(int arg) { // 进入AQS的队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 当前节点的前一个节点 final Node p = node.predecessor(); // 如果前一个节点是头节点(说明是第一个排队的节点) if (p == head) { // 再次尝试获取读锁 int r = tryAcquireShared(arg); // 如果成功了 if (r >= 0) { // 头节点后移并传播 // 传播即唤醒后面连续的读节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 没获取到读锁,阻塞并等待被唤醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // AbstractQueuedSynchronizer.setHeadAndPropagate() private void setHeadAndPropagate(Node node, int propagate) { // h为旧的头节点 Node h = head; // 设置当前节点为新头节点 setHead(node); // 如果旧的头节点或新的头节点为空或者其等待状态小于0(表示状态为SIGNAL/PROPAGATE) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 需要传播 // 取下一个节点 Node s = node.next; // 如果下一个节点为空,或者是需要获取读锁的节点 if (s == null || s.isShared()) // 唤醒下一个节点 doReleaseShared(); } } // AbstractQueuedSynchronizer.doReleaseShared() // 这个方法只会唤醒一个节点 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL,说明要唤醒下一个节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒下一个节点 unparkSuccessor(h); } else if (ws == 0 && // 把头节点的状态改为PROPAGATE成功才会跳到下面的if !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果唤醒后head没变,则跳出循环 if (h == head) // loop if head changed break; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwffd.html