ReentrantReadWriteLock是Java中的一个可重入读写锁,它允许多个线程同时读取一个共享资源,但只允许一个线程写入该共享资源,当一个线程持有写锁时,任何其他线程都不能持有读或写锁。
该锁具有以下特点:
- 可重入性:线程可以多次获得同一个锁。
- 公平性:可选择公平或非公平模式。
- 读写分离:支持多个线程同时读取共享资源,但只允许一个线程写入共享资源。
- 锁降级:写锁可以降级为读锁,但读锁不能升级为写锁。
使用ReentrantReadWriteLock可以提高并发访问性能,因为多个线程可以同时读取共享资源,而不会相互干扰。但是,在写操作期间,所有读和写的访问都会被暂停,直到写操作完成。
其实现较为复杂,UML类图如下
这张图更为形象
读写锁特性如下
是否互斥 | 读 | 写 |
---|---|---|
读 | 否 | 是 |
写 | 是 | 是 |
如何实现读写锁,下面我们来依据源码一一道来
构造函数
//默认构造方法public ReentrantReadWriteLock() {this(false);}//默认构造方法 public ReentrantReadWriteLock() { this(false); }//默认构造方法 public ReentrantReadWriteLock() { this(false); }
//是否使用公平锁的构造方法public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);}//是否使用公平锁的构造方法 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }//是否使用公平锁的构造方法 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
成员变量设置如下
//读锁private final ReentrantReadWriteLock.ReadLock readerLock;//写锁private final ReentrantReadWriteLock.WriteLock writerLock;//继承AQS锁实现final Sync sync;//读锁 private final ReentrantReadWriteLock.ReadLock readerLock; //写锁 private final ReentrantReadWriteLock.WriteLock writerLock; //继承AQS锁实现 final Sync sync;//读锁 private final ReentrantReadWriteLock.ReadLock readerLock; //写锁 private final ReentrantReadWriteLock.WriteLock writerLock; //继承AQS锁实现 final Sync sync;
Sync
Sync是ReentrantReadWriteLock的内部静态类,它是ReentrantReadWriteLock的核心实现。Sync继承自AbstractQueuedSynchronizer(AQS),并重写了其方法来实现读写锁的语义。
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 6317671515068378041L;//表示共享锁状态在锁状态中占用的二进制位数(16位),即共享锁状态的左移位数static final int SHARED_SHIFT = 16;//表示每个共享锁状态的数量(1 << SHARED_SHIFT)static final int SHARED_UNIT = (1 << SHARED_SHIFT);//表示共享锁状态的最大数量((1 << SHARED_SHIFT) - 1)。static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//返回一个掩码,用于提取锁状态中的排它锁状态(即低16位)static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;...abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //表示共享锁状态在锁状态中占用的二进制位数(16位),即共享锁状态的左移位数 static final int SHARED_SHIFT = 16; //表示每个共享锁状态的数量(1 << SHARED_SHIFT) static final int SHARED_UNIT = (1 << SHARED_SHIFT); //表示共享锁状态的最大数量((1 << SHARED_SHIFT) - 1)。 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //返回一个掩码,用于提取锁状态中的排它锁状态(即低16位) static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; ...abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //表示共享锁状态在锁状态中占用的二进制位数(16位),即共享锁状态的左移位数 static final int SHARED_SHIFT = 16; //表示每个共享锁状态的数量(1 << SHARED_SHIFT) static final int SHARED_UNIT = (1 << SHARED_SHIFT); //表示共享锁状态的最大数量((1 << SHARED_SHIFT) - 1)。 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //返回一个掩码,用于提取锁状态中的排它锁状态(即低16位) static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; ...
量和方法的作用是将锁状态分成两部分:高16位表示读锁状态,低16位表示写锁状态。
这种方式,可以高效地实现读写分离的锁机制。在获取和释放锁时,可以根据不同的状态进行不同的处理,以保证锁的正确性和高效性。
Sync有两个实现,非公平锁和公平锁
//非公平锁static final class NonfairSync extends Sync {private static final long serialVersionUID = -8159625535654395037L;final boolean writerShouldBlock() {return false; // writers can always barge}final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}//非公平锁 static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }//非公平锁 static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } }
//公平锁static final class FairSync extends Sync {private static final long serialVersionUID = -2274990926593161451L;final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}}//公平锁 static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }//公平锁 static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
ReadLock、WriteLock
//读锁public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}...//读锁 public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ...//读锁 public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ...
//写锁public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}//写锁 public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }//写锁 public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; }
读锁和写锁是私有属性,通过这两个方法暴露出去
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
非公平锁读写锁加锁解锁
读锁ReadLock
加锁
获取读锁。
如果写锁没有被另一个线程持有,则立即获取读锁并返回。
如果写锁被另一个线程持有,则当前线程为了线程调度目的变为无效状态,并处于休眠状态,直到读锁被获取。
lock()
//ReentrantReadWriteLock.ReadLockpublic void lock() {sync.acquireShared(1);}//ReentrantReadWriteLock.ReadLock public void lock() { sync.acquireShared(1); }//ReentrantReadWriteLock.ReadLock public void lock() { sync.acquireShared(1); }
共享状态获取锁,acquireShared是AQS方法
// AbstractQueuedSynchronizerpublic final void acquireShared(int arg) {// 尝试获取共享锁(返回1表示成功,返回-1表示失败)if (tryAcquireShared(arg) < 0)// 失败了就可能要排队等待doAcquireShared(arg);}// AbstractQueuedSynchronizer public final void acquireShared(int arg) { // 尝试获取共享锁(返回1表示成功,返回-1表示失败) if (tryAcquireShared(arg) < 0) // 失败了就可能要排队等待 doAcquireShared(arg); }// AbstractQueuedSynchronizer public final void acquireShared(int arg) { // 尝试获取共享锁(返回1表示成功,返回-1表示失败) if (tryAcquireShared(arg) < 0) // 失败了就可能要排队等待 doAcquireShared(arg); }
下面主要解读ReentrantReadWriteLock.Sync中重写的tryAcquireShared方法
//ReentrantReadWriteLock.Syncprotected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();//// 状态变量的值 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数int c = getState();//写锁本占用,判断是否被当前线程占用,如果不是直接返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;//读锁获取次数int r = sharedCount(c);//读锁不需要block且读锁次数小于最大值,尝试更新state值if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {//获取读锁成功//如果之前还没有线程获取读锁if (r == 0) {// 记录第一个读者为当前线程firstReader = current;// 第一个读者重入的次数为1firstReaderHoldCount = 1;// 如果有线程获取了读锁且是当前线程} else if (firstReader == current) {firstReaderHoldCount++;} else {// 如果有线程获取了读锁且当前线程不是第一个// 则从缓存中获取重入次数保存器HoldCounter rh = cachedHoldCounter;// 如果缓存不属性当前线程 再从ThreadLocal中获取if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)// 如果rh的次数为0,把它放到ThreadLocal中去readHolds.set(rh);// 重入的次数加1(初始次数为0)rh.count++;}// 获取读锁成功,返回1return 1;}// 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败)return fullTryAcquireShared(current);}//ReentrantReadWriteLock.Sync protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //// 状态变量的值 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数 int c = getState(); //写锁本占用,判断是否被当前线程占用,如果不是直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁获取次数 int r = sharedCount(c); //读锁不需要block且读锁次数小于最大值,尝试更新state值 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //获取读锁成功 //如果之前还没有线程获取读锁 if (r == 0) { // 记录第一个读者为当前线程 firstReader = current; // 第一个读者重入的次数为1 firstReaderHoldCount = 1; // 如果有线程获取了读锁且是当前线程 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 如果有线程获取了读锁且当前线程不是第一个 // 则从缓存中获取重入次数保存器 HoldCounter rh = cachedHoldCounter; // 如果缓存不属性当前线程 再从ThreadLocal中获取 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 如果rh的次数为0,把它放到ThreadLocal中去 readHolds.set(rh); // 重入的次数加1(初始次数为0) rh.count++; } // 获取读锁成功,返回1 return 1; } // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败) return fullTryAcquireShared(current); }//ReentrantReadWriteLock.Sync protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //// 状态变量的值 在读写锁模式下,高16位存储的是共享锁(读锁)被获取的次数,低16位存储的是互斥锁(写锁)被获取的次数 int c = getState(); //写锁本占用,判断是否被当前线程占用,如果不是直接返回-1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁获取次数 int r = sharedCount(c); //读锁不需要block且读锁次数小于最大值,尝试更新state值 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //获取读锁成功 //如果之前还没有线程获取读锁 if (r == 0) { // 记录第一个读者为当前线程 firstReader = current; // 第一个读者重入的次数为1 firstReaderHoldCount = 1; // 如果有线程获取了读锁且是当前线程 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 如果有线程获取了读锁且当前线程不是第一个 // 则从缓存中获取重入次数保存器 HoldCounter rh = cachedHoldCounter; // 如果缓存不属性当前线程 再从ThreadLocal中获取 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 如果rh的次数为0,把它放到ThreadLocal中去 readHolds.set(rh); // 重入的次数加1(初始次数为0) rh.count++; } // 获取读锁成功,返回1 return 1; } // 通过这个方法再去尝试获取读锁(如果之前其它线程获取了写锁,一样返回-1表示失败) return fullTryAcquireShared(current); }
exclusiveCount是写锁获取次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
sharedCount是读锁获取次数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }static int sharedCount(int c) { return c >>> SHARED_SHIFT; }static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
readerShouldBlock是判断获取读锁是否需要block
// AbstractQueuedSynchronizerabstract boolean readerShouldBlock();// AbstractQueuedSynchronizer abstract boolean readerShouldBlock();// AbstractQueuedSynchronizer abstract boolean readerShouldBlock();
我们看非公平锁的实现
//ReentrantReadWriteLock.NonfairSyncfinal boolean readerShouldBlock() {//该方法检查在队列中是否存在等待的写入线程,如果存在,则返回truereturn apparentlyFirstQueuedIsExclusive();}//ReentrantReadWriteLock.NonfairSync final boolean readerShouldBlock() { //该方法检查在队列中是否存在等待的写入线程,如果存在,则返回true return apparentlyFirstQueuedIsExclusive(); }//ReentrantReadWriteLock.NonfairSync final boolean readerShouldBlock() { //该方法检查在队列中是否存在等待的写入线程,如果存在,则返回true return apparentlyFirstQueuedIsExclusive(); }
//AbstractQueuedSynchronizerfinal boolean apparentlyFirstQueuedIsExclusive() {Node h, s;// 获取当前的头结点。// 如果头结点不为null,则获取头结点的下一个结点。// 如果下一个结点不为null,并且该结点不是共享结点,且该结点的线程不为null,则返回true;否则返回false。return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}//AbstractQueuedSynchronizer final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; // 获取当前的头结点。 // 如果头结点不为null,则获取头结点的下一个结点。 // 如果下一个结点不为null,并且该结点不是共享结点,且该结点的线程不为null,则返回true;否则返回false。 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }//AbstractQueuedSynchronizer final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; // 获取当前的头结点。 // 如果头结点不为null,则获取头结点的下一个结点。 // 如果下一个结点不为null,并且该结点不是共享结点,且该结点的线程不为null,则返回true;否则返回false。 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
HoldCounter readHolds定义如下
private transient HoldCounter cachedHoldCounter;private transient ThreadLocalHoldCounter readHolds;static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread());}private transient HoldCounter cachedHoldCounter; private transient ThreadLocalHoldCounter readHolds; static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }private transient HoldCounter cachedHoldCounter; private transient ThreadLocalHoldCounter readHolds; static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }
fullTryAcquireShared尝试在此获取共享锁,法的实现与 tryAcquireShared
方法中的代码在一定程度上重复,但是此方法不会在重试和惰性读取保持计数之间增加复杂性,因此更加简单。
//ReentrantReadWriteLock.Syncfinal int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) {// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}//ReentrantReadWriteLock.Sync final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }//ReentrantReadWriteLock.Sync final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
后面doAcquireShared共享模式下是否需要等待,前文已经分析,具体看下这篇
并发-AQS之Semaphore源码解读
lockInterruptibly()
中断获取锁
//ReentrantReadWriteLock.ReadLockpublic void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}//ReentrantReadWriteLock.ReadLock public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }//ReentrantReadWriteLock.ReadLock public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
acquireSharedInterruptibly增加了线程中断相关逻辑,获取共享锁的方式没变
//AbstractQueuedSynchronizerpublic final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}//AbstractQueuedSynchronizer public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }//AbstractQueuedSynchronizer public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
doAcquireSharedInterruptibly前文也分析过,可看下
并发-AQS之Semaphore源码解读
尝试获取锁
tryLock()
//ReentrantReadWriteLock.ReadLockpublic boolean tryLock() {return sync.tryReadLock();}//ReentrantReadWriteLock.ReadLock public boolean tryLock() { return sync.tryReadLock(); }//ReentrantReadWriteLock.ReadLock public boolean tryLock() { return sync.tryReadLock(); }
tryReadLock方法逻辑与fullTryAcquireShared类似,不再赘述
//ReentrantReadWriteLock.Syncfinal boolean tryReadLock() {Thread current = Thread.currentThread();for (;;) {int c = getState();if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return false;int r = sharedCount(c);if (r == MAX_COUNT)throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return true;}}}//ReentrantReadWriteLock.Sync final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }//ReentrantReadWriteLock.Sync final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }
tryLock(long timeout, TimeUnit unit)
//ReentrantReadWriteLock.ReadLockpublic boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}//ReentrantReadWriteLock.ReadLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }//ReentrantReadWriteLock.ReadLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
//AbstractQueuedSynchronizerpublic final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}//AbstractQueuedSynchronizer public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }//AbstractQueuedSynchronizer public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
doAcquireSharedNanos前文也分析过,可看下
并发-AQS之Semaphore源码解读
解锁
unlock()
//ReentrantReadWriteLock.ReadLockpublic void unlock() {sync.releaseShared(1);}//ReentrantReadWriteLock.ReadLock public void unlock() { sync.releaseShared(1); }//ReentrantReadWriteLock.ReadLock public void unlock() { sync.releaseShared(1); }
//AbstractQueuedSynchronizerpublic final boolean releaseShared(int arg) {//如果尝试释放成功了(共享锁全部释放),就唤醒下一个节点if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}//AbstractQueuedSynchronizer public final boolean releaseShared(int arg) { //如果尝试释放成功了(共享锁全部释放),就唤醒下一个节点 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }//AbstractQueuedSynchronizer public final boolean releaseShared(int arg) { //如果尝试释放成功了(共享锁全部释放),就唤醒下一个节点 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared实现如下
//ReentrantReadWriteLock.Syncprotected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// 如果第一个读线程是当前线程 就把它重入的次数减1 如果减到0了就把第一个读者置为空if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {// 如果第一个读者不是当前线程// 一样地,把它重入的次数减1HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 共享锁获取的次数减1// 如果减为0了说明完全释放了,才返回trueint c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}//ReentrantReadWriteLock.Sync protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 如果第一个读线程是当前线程 就把它重入的次数减1 如果减到0了就把第一个读者置为空 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果第一个读者不是当前线程 // 一样地,把它重入的次数减1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 共享锁获取的次数减1 // 如果减为0了说明完全释放了,才返回true int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }//ReentrantReadWriteLock.Sync protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 如果第一个读线程是当前线程 就把它重入的次数减1 如果减到0了就把第一个读者置为空 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 如果第一个读者不是当前线程 // 一样地,把它重入的次数减1 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 共享锁获取的次数减1 // 如果减为0了说明完全释放了,才返回true int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
doReleaseShared唤醒下一个节点,前文也分析过,可看下
并发-AQS之Semaphore源码解读
newCondition()
不支持
//ReentrantReadWriteLock.ReadLockpublic Condition newCondition() {throw new UnsupportedOperationException();}//ReentrantReadWriteLock.ReadLock public Condition newCondition() { throw new UnsupportedOperationException(); }//ReentrantReadWriteLock.ReadLock public Condition newCondition() { throw new UnsupportedOperationException(); }
写锁WriteLock
加锁
lock()
独占式获取锁
//ReentrantReadWriteLock.WriteLockpublic void lock() {sync.acquire(1);}//ReentrantReadWriteLock.WriteLock public void lock() { sync.acquire(1); }//ReentrantReadWriteLock.WriteLock public void lock() { sync.acquire(1); }
//AbstractQueuedSynchronizerpublic final void acquire(int arg) {// 先尝试获取锁 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一样if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}//AbstractQueuedSynchronizer public final void acquire(int arg) { // 先尝试获取锁 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一样 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }//AbstractQueuedSynchronizer public final void acquire(int arg) { // 先尝试获取锁 如果失败,则会进入队列中排队,后面的逻辑跟ReentrantLock一样 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
//ReentrantReadWriteLock.Syncprotected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*/Thread current = Thread.currentThread();int c = getState();// 互斥锁被获取的次数int w = exclusiveCount(c);if (c != 0) {// 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) 那么就返回false,获取写锁失败if (w == 0 || current != getExclusiveOwnerThread())return false;// 溢出检测if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// 共享锁不为0且当前线程=占用线程setState(c + acquires);return true;}// 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false)// 如果失败了,说明获取写锁失败,返回false// 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回trueif (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}//ReentrantReadWriteLock.Sync protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); // 互斥锁被获取的次数 int w = exclusiveCount(c); if (c != 0) { // 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) 那么就返回false,获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 溢出检测 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 共享锁不为0且当前线程=占用线程 setState(c + acquires); return true; } // 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false) // 如果失败了,说明获取写锁失败,返回false // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }//ReentrantReadWriteLock.Sync protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); // 互斥锁被获取的次数 int w = exclusiveCount(c); if (c != 0) { // 如果共享锁被获取的次数不为0,或者被其它线程获取了互斥锁(写锁) 那么就返回false,获取写锁失败 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 溢出检测 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 共享锁不为0且当前线程=占用线程 setState(c + acquires); return true; } // 如果c等于0,就尝试更新state的值(非公平模式writerShouldBlock()返回false) // 如果失败了,说明获取写锁失败,返回false // 如果成功了,说明获取写锁成功,把自己设置为占有者,并返回true if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
NonfairSync的writerShouldBlock直接返回false
//ReentrantReadWriteLock.NonfairSyncfinal boolean writerShouldBlock() {return false; // writers can always barge}//ReentrantReadWriteLock.NonfairSync final boolean writerShouldBlock() { return false; // writers can always barge }//ReentrantReadWriteLock.NonfairSync final boolean writerShouldBlock() { return false; // writers can always barge }
acquireQueued参考Reentrantlonk实现
并发-AQS之Reentrantlonk源码解读
lockInterruptibly()
//ReentrantReadWriteLock.WriteLockpublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}//ReentrantReadWriteLock.WriteLock public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }//ReentrantReadWriteLock.WriteLock public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
//AbstractQueuedSynchronizerpublic final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}//AbstractQueuedSynchronizer public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }//AbstractQueuedSynchronizer public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
doAcquireInterruptibly参考Reentrantlonk实现
并发-AQS之Reentrantlonk源码解读
尝试获取锁
tryLock( )
//ReentrantReadWriteLock.WriteLockpublic boolean tryLock( ) {return sync.tryWriteLock();}//ReentrantReadWriteLock.WriteLock public boolean tryLock( ) { return sync.tryWriteLock(); }//ReentrantReadWriteLock.WriteLock public boolean tryLock( ) { return sync.tryWriteLock(); }
//ReentrantReadWriteLock.Syncfinal boolean tryWriteLock() {Thread current = Thread.currentThread();int c = getState();if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;}//ReentrantReadWriteLock.Sync final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }//ReentrantReadWriteLock.Sync final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
tryLock(long timeout, TimeUnit unit)
//ReentrantReadWriteLock.WriteLockpublic boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}//ReentrantReadWriteLock.WriteLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }//ReentrantReadWriteLock.WriteLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
//AbstractQueuedSynchronizerpublic final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}//AbstractQueuedSynchronizer public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }//AbstractQueuedSynchronizer public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos 参考Reentrantlonk实现
并发-AQS之Reentrantlonk源码解读
解锁
unlock()
//ReentrantReadWriteLock.WriteLockpublic void unlock() {sync.release(1);}//ReentrantReadWriteLock.WriteLock public void unlock() { sync.release(1); }//ReentrantReadWriteLock.WriteLock public void unlock() { sync.release(1); }
//AbstractQueuedSynchronizerpublic final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}//AbstractQueuedSynchronizer public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }//AbstractQueuedSynchronizer public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
//ReentrantReadWriteLock.Syncprotected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}//ReentrantReadWriteLock.Sync protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }//ReentrantReadWriteLock.Sync protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
其他方法
newCondition()
//ReentrantReadWriteLock.WriteLockpublic Condition newCondition() {return sync.newCondition();}//ReentrantReadWriteLock.WriteLock public Condition newCondition() { return sync.newCondition(); }//ReentrantReadWriteLock.WriteLock public Condition newCondition() { return sync.newCondition(); }
isHeldByCurrentThread()
//ReentrantReadWriteLock.WriteLockpublic boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}//ReentrantReadWriteLock.WriteLock public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }//ReentrantReadWriteLock.WriteLock public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); }
//ReentrantReadWriteLock.Syncprotected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}//ReentrantReadWriteLock.Sync protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }//ReentrantReadWriteLock.Sync protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
getHoldCount()
//ReentrantReadWriteLock.WriteLockpublic int getHoldCount() {return sync.getWriteHoldCount();}//ReentrantReadWriteLock.WriteLock public int getHoldCount() { return sync.getWriteHoldCount(); }//ReentrantReadWriteLock.WriteLock public int getHoldCount() { return sync.getWriteHoldCount(); }
//ReentrantReadWriteLock.Syncfinal int getWriteHoldCount() {return isHeldExclusively() ? exclusiveCount(getState()) : 0;}//ReentrantReadWriteLock.Sync final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; }//ReentrantReadWriteLock.Sync final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; }
篇幅较长,剩余部分放到另外一篇
并发-AQS之ReentrantReadWriteLock源码解读(二)