内存一致性
在多核处理器中,每个核心都有
自己的缓存
,线程在执行时可能会读取和修改共享的内存数据。如果没有内存一致性保证,不同核心上的线程可能会看到不同的内存值,导致数据不一致的问题。其实也就是常说的
原子性、可见性、有序性
解决方案:
- 使用
volatile
关键字:将共享变量声明为volatile,可以确保对该变量的写
操作立即刷新到主内存
,并且对该变量的读
操作都从主内存中获取
最新的值。 - 使用
synchronized
关键字或Lock
:通过同步机制来保证多线程之间的原子性和有序性。synchronized关键字和Lock都能够创建一个临界区,确保同一时刻只有一个线程能够进入该临界区。 - 使用
原子类
:Java提供了一些原子类,例如AtomicInteger、AtomicLong等,它们提供了基本的原子操作,可以避免多线程竞争的问题。
指令重排序是什么?
如下指令的重排导致例如在双检索中,第一次检查条件满足而直接返回instance
实例,但实际上该对象的构造还未执行完毕
instance = new Singleton();语句优化成如下的序列:1、new分配指针;2、instance 赋值;3、调用Singleton构造函数。instance = new Singleton(); 语句优化成如下的序列: 1、new分配指针; 2、instance 赋值; 3、调用Singleton构造函数。instance = new Singleton(); 语句优化成如下的序列: 1、new分配指针; 2、instance 赋值; 3、调用Singleton构造函数。
你是否了解内存屏障?
举个简单的例子认识下为什么内存模型进行优化的思路,比如x\y都进行写操作,其顺序对整体无影响性,那么其乱序执行对最终结果无干扰,亦或是x\y进行无相关的读写操作。整体上来说乱序的提升比其带来的影响大的多,所以引入内存屏障
来解决特殊情况下的问题
x = 10;y = 1;// 此时虽然x\y无相关,但如果a写入,比y读取a时更晚,那y读到的将是初始值0a = 1;x = 10;y = a;x = 10; y = 1; // 此时虽然x\y无相关,但如果a写入,比y读取a时更晚,那y读到的将是初始值0 a = 1; x = 10; y = a;x = 10; y = 1; // 此时虽然x\y无相关,但如果a写入,比y读取a时更晚,那y读到的将是初始值0 a = 1; x = 10; y = a;
# 要是不明白,那就再看看这篇,前面内容可以快速看,我认为精髓在中间以后部分
对于指令排序,无非就是store(写),load(读)的排列组合:s-s(sfence),s-l(mfence),l-l(lfence),l-s,其中fence(栅栏),可以理解为屏障
- 在
写volatile
变量v之后,插入一个sfence,禁用跨sfence的store重排序;且sfence之前修改的值都会被写回缓存,并标记其他CPU中的缓存失效
。 - 在
读volatile
变量v之前,插入一个lfence,禁用跨lfence的load重排序;且lfence之后,会首先刷新无效缓存
,从而得到最新的修改值,与sfence配合保证内存可见性。
你是否了解final关键字?
- 常量:一旦变量被赋值,就不能再修改它的值
- 不可继承:
final
可以用来修饰类,表示该类不能被继承 - 不可重写:
final
可以用来修饰方法,表示该方法不能被子类重写(覆盖) - 提高性能:在某些情况下,使用
final
关键字可以让编译器对代码进行优化,提高程序的性能。
JVM会在初始化final变量<init>()
后插入一个sfence,保证final字段初始化之前(include
)的内存更新都是可见的。
锁是如何实现可见性的
对总线和缓存上锁,具有mfence语义
再谈线程
线程池提交任务有哪些方法?
-
execute(Runnable task)
: 提交一个Runnable任务给线程池。该方法没有返回值。 -
submit(Runnable task)
: 提交一个Runnable任务给线程池,并返回一个表示该任务的Future对象。可以通过Future对象来获取任务的执行结果。 -
submit(Callable<T> task)
: 提交一个Callable任务给线程池,并返回一个表示该任务的Future对象。可以通过Future对象来获取任务的执行结果。 -
invokeAny(Collection<? extends Callable<T>> tasks)
: 提交一个Callable任务集合给线程池,返回其中一个任务的执行结果。当有一个任务成功执行完毕后,该方法就会返回。 -
invokeAll(Collection<? extends Callable<T>> tasks)
: 提交一个Callable任务集合给线程池,返回所有任务的执行结果。当所有任务都执行完毕后,该方法才会返回。
总结:execute
就是普通的提交,submit
会返回Future,其get()
获取结果时会阻塞,invoke
提交一个集合,根据需要选择返回的时机(注意会阻塞)
线程sleep和yield方法的区别?
Thread.sleep()
和Thread.yield()
方法都可以用于线程的控制,但它们有一些区别。
-
Thread.sleep()
: 暂停线程让渡CPU,休眠指定固定时长,但锁不释放,可能抛出InterruptedException
异常 -
Thread.yield()
: CPU让渡,运行态->就绪态,停滞时长取决于CPU调度器,只能在同步代码块中使用
调用对象hashcode方法,对锁有什么影响?
Java对象的对象头里包含两个部分,一个是Mark Word
,另一个是类型指针。而对于Mark Word
而言,通过精细的划分,其在64 bit的空间内可以变着花样的存储各类信息。而Hash Code
正是其中之一
结论: 一个对象在调用原生hashCode
方法后(来自Object的,未被重写过的
),该对象将无法进入偏向锁状态,起步就会是轻量级锁。若hashCode
方法的调用是在对象已经处于偏向锁状态时调用,它的偏向状态会被立即撤销,并且锁会升级为重量级锁。 hashCode 存储在当前持有锁的线程内部,解锁时就是恢复至加锁前的对象头状态
AQS模型与线程同步工具
CountDownLatch
CountDownLatch通过一个计数器来实现线程之间的同步。在创建CountDownLatch对象时,需要指定一个初始计数值。当一个线程完成了自己的任务后,可以调用CountDownLatch的countDown()方法,将计数值减1。其他等待的线程可以调用CountDownLatch的await()方法来等待计数值变为0。当计数值变为0时,所有等待的线程会被唤醒,继续执行。
import java.util.concurrent.CountDownLatchfun main() {val numThreads = 5val latch = CountDownLatch(numThreads)for (i in 0 until numThreads) {val thread = Thread(Worker(latch))thread.start()}// 等待所有线程完成任务latch.await()println("所有线程已完成任务,继续执行主线程")}class Worker(private val latch: CountDownLatch) : Runnable {override fun run() {try {// 模拟线程执行任务Thread.sleep((Math.random() * 1000).toLong())println("线程${Thread.currentThread().id}完成任务")// 计数器减1latch.countDown()} catch (e: InterruptedException) {e.printStackTrace()}}}import java.util.concurrent.CountDownLatch fun main() { val numThreads = 5 val latch = CountDownLatch(numThreads) for (i in 0 until numThreads) { val thread = Thread(Worker(latch)) thread.start() } // 等待所有线程完成任务 latch.await() println("所有线程已完成任务,继续执行主线程") } class Worker(private val latch: CountDownLatch) : Runnable { override fun run() { try { // 模拟线程执行任务 Thread.sleep((Math.random() * 1000).toLong()) println("线程${Thread.currentThread().id}完成任务") // 计数器减1 latch.countDown() } catch (e: InterruptedException) { e.printStackTrace() } } }import java.util.concurrent.CountDownLatch fun main() { val numThreads = 5 val latch = CountDownLatch(numThreads) for (i in 0 until numThreads) { val thread = Thread(Worker(latch)) thread.start() } // 等待所有线程完成任务 latch.await() println("所有线程已完成任务,继续执行主线程") } class Worker(private val latch: CountDownLatch) : Runnable { override fun run() { try { // 模拟线程执行任务 Thread.sleep((Math.random() * 1000).toLong()) println("线程${Thread.currentThread().id}完成任务") // 计数器减1 latch.countDown() } catch (e: InterruptedException) { e.printStackTrace() } } }
首先在构造时传入计数器初始值
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
当线程工作完成时,调用countDown()
,对计数器-1,当-1后为0的话,需要进行doReleaseShared()
释放,其中通过LockSupport.unpark
进行对阻塞线程的唤醒
public void countDown() {sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// CountDownLatch.Syncprotected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}public void countDown() { sync.releaseShared(1); } // AQS public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // CountDownLatch.Sync protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } }public void countDown() { sync.releaseShared(1); } // AQS public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // CountDownLatch.Sync protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
等待的线程去获取锁,当state为0时,即不会阻塞,否则doAcquireSharedNanos
中,通过LockSupport.parkNanos
进行阻塞
// CountDownLatchpublic boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}// AQSpublic final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}// CountDownLatch.Syncprotected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// CountDownLatch public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // AQS public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } // CountDownLatch.Sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }// CountDownLatch public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // AQS public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } // CountDownLatch.Sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
CyclicBarrier
CyclicBarrier也通过一个计数器来实现线程之间的同步。在创建CyclicBarrier对象时,需要指定一个参与同步的线程数量。当一个线程到达了CyclicBarrier的屏障点时,会调用CyclicBarrier的await()方法进行等待。当所有参与线程都到达了屏障点时,CyclicBarrier会释放所有等待的线程,然后重置计数器,可以继续使用。
import java.util.concurrent.CyclicBarrierfun main() {val numThreads = 5val barrier = CyclicBarrier(numThreads) {println("所有线程已到达屏障点,继续执行主线程")}for (i in 0 until numThreads) {val thread = Thread(Worker(barrier))thread.start()}}class Worker(private val barrier: CyclicBarrier) : Runnable {override fun run() {try {// 模拟线程执行任务Thread.sleep((Math.random() * 1000).toLong())println("线程${Thread.currentThread().id}到达屏障点")// 等待其他线程到达屏障点barrier.await()// 所有线程到达屏障点后继续执行后续操作println("线程${Thread.currentThread().id}继续执行")} catch (e: InterruptedException) {e.printStackTrace()}}}import java.util.concurrent.CyclicBarrier fun main() { val numThreads = 5 val barrier = CyclicBarrier(numThreads) { println("所有线程已到达屏障点,继续执行主线程") } for (i in 0 until numThreads) { val thread = Thread(Worker(barrier)) thread.start() } } class Worker(private val barrier: CyclicBarrier) : Runnable { override fun run() { try { // 模拟线程执行任务 Thread.sleep((Math.random() * 1000).toLong()) println("线程${Thread.currentThread().id}到达屏障点") // 等待其他线程到达屏障点 barrier.await() // 所有线程到达屏障点后继续执行后续操作 println("线程${Thread.currentThread().id}继续执行") } catch (e: InterruptedException) { e.printStackTrace() } } }import java.util.concurrent.CyclicBarrier fun main() { val numThreads = 5 val barrier = CyclicBarrier(numThreads) { println("所有线程已到达屏障点,继续执行主线程") } for (i in 0 until numThreads) { val thread = Thread(Worker(barrier)) thread.start() } } class Worker(private val barrier: CyclicBarrier) : Runnable { override fun run() { try { // 模拟线程执行任务 Thread.sleep((Math.random() * 1000).toLong()) println("线程${Thread.currentThread().id}到达屏障点") // 等待其他线程到达屏障点 barrier.await() // 所有线程到达屏障点后继续执行后续操作 println("线程${Thread.currentThread().id}继续执行") } catch (e: InterruptedException) { e.printStackTrace() } } }
构造获取了需要计数器和同步屏障时需要执行的Runnable,内部借助ReentrantLock,区别于CountDownLatch是内部实现了AQS的Sync。command.run()
为同步屏障Runnable,breakBarrier()
为屏障释放,否则trip.await()
进行休眠。区别于CountDownLatch的LockSupport.park(),这里使用的是Lock.condition.await()\signalAll()
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}int index = --count;// 当计数器为0时,执行内容并释放屏障if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 当计数器为0时,执行内容并释放屏障 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; // 当计数器为0时,执行内容并释放屏障 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
ReadWriteLock
ReentrantReadWriteLock
通过 AQS 实现了读写锁的高效管理,读锁可以共享,写锁是独占的,可以满足读多写少的并发需求
写操作(独占锁)的时候,需要判断是否有读操作在进行。这是因为在读操作期间,写操作是不允许的,需要等待所有的读操作完成后才能进行写操作,以保证数据的一致性。
ReentrantReadWriteLock通过维护一个计数器来记录当前获取读锁的线程数量。当有线程获取读锁时,计数器会增加;当线程释放读锁时,计数器会减少。通过检查计数器是否为0,可以判断是否有读操作在进行。
在写操作获取写锁之前,会先检查计数器是否为0,如果不为0,说明有读操作在进行,写操作就会被阻塞,直到计数器为0,即所有读操作都完成,才能获取写锁进行写操作。
import java.util.concurrent.locks.ReentrantReadWriteLockclass ReadWriteLockExample {private val lock = ReentrantReadWriteLock()private var value: Int = 0fun writeValue(newValue: Int) {lock.writeLock().lock()try {// 写操作,对共享数据进行更新value = newValue} finally {lock.writeLock().unlock()}}fun readValue(): Int {lock.readLock().lock()try {// 读操作,对共享数据进行读取return value} finally {lock.readLock().unlock()}}}import java.util.concurrent.locks.ReentrantReadWriteLock class ReadWriteLockExample { private val lock = ReentrantReadWriteLock() private var value: Int = 0 fun writeValue(newValue: Int) { lock.writeLock().lock() try { // 写操作,对共享数据进行更新 value = newValue } finally { lock.writeLock().unlock() } } fun readValue(): Int { lock.readLock().lock() try { // 读操作,对共享数据进行读取 return value } finally { lock.readLock().unlock() } } }import java.util.concurrent.locks.ReentrantReadWriteLock class ReadWriteLockExample { private val lock = ReentrantReadWriteLock() private var value: Int = 0 fun writeValue(newValue: Int) { lock.writeLock().lock() try { // 写操作,对共享数据进行更新 value = newValue } finally { lock.writeLock().unlock() } } fun readValue(): Int { lock.readLock().lock() try { // 读操作,对共享数据进行读取 return value } finally { lock.readLock().unlock() } } }
看一下读锁,是对共享资源的操作,当没有写操作时,是允许多线程同时读,当发生写操作时,会获取失败,进入等待状态
public static class ReadLock implements Lock, java.io.Serializable {public void lock() {sync.acquireShared(1);}public void unlock() {sync.releaseShared(1);}}// AQSpublic final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)// 尝试获取失败的话,会进入等待状态doAcquireShared(arg);}// 对计量器c,进行位拆分// 共享部分static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 独占部分static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }// Syncprotected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 只要exclusiveCount独占资源没被占有 且 写线程不为当前线程int r = sharedCount(c);if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}return fullTryAcquireShared(current);}public static class ReadLock implements Lock, java.io.Serializable { public void lock() { sync.acquireShared(1); } public void unlock() { sync.releaseShared(1); } } // AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 尝试获取失败的话,会进入等待状态 doAcquireShared(arg); } // 对计量器c,进行位拆分 // 共享部分 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 独占部分 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // Sync protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 只要exclusiveCount独占资源没被占有 且 写线程不为当前线程 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }public static class ReadLock implements Lock, java.io.Serializable { public void lock() { sync.acquireShared(1); } public void unlock() { sync.releaseShared(1); } } // AQS public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 尝试获取失败的话,会进入等待状态 doAcquireShared(arg); } // 对计量器c,进行位拆分 // 共享部分 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 独占部分 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // Sync protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 只要exclusiveCount独占资源没被占有 且 写线程不为当前线程 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
再看写锁,是对独占资源的操作,这里就跳过AQS的部分调用,直接看Sync部分了。当需要进行写操作时,由于写操作的互斥性,计数器需要为0,否则即意味着读\写位有计数。其中关于读写发生阻塞时都依赖AQS内部的LockSupport.park()\unpark()
,而实际上Lock.condition.await()
最终也依赖于AQS的ConditionObject
,其本质是一样的,区别于Object.wait()\notify()
,不需要与特定对象关联,也不会出现 IllegalMonitorStateException
异常,因为 LockSupport
使用了许可证的方式来进行线程的阻塞和唤醒。
public static class WriteLock implements Lock, java.io.Serializable {public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}}protected final boolean tryRelease(int releases) {if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)// 这里判断c是写还是读,如果没有写的话,那就是读,需要等待。如果是读,那就看读是不是本线程,进行重入if (w == 0 || current != getExclusiveOwnerThread())return false;if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquiresetState(c + acquires);return true;}if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}public static class WriteLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); } } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 这里判断c是写还是读,如果没有写的话,那就是读,需要等待。如果是读,那就看读是不是本线程,进行重入 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }public static class WriteLock implements Lock, java.io.Serializable { public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); } } protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 这里判断c是写还是读,如果没有写的话,那就是读,需要等待。如果是读,那就看读是不是本线程,进行重入 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }