本文接着上文并发-AQS之ThreadPoolExecutor源码解读(一)继续解读源码
关键方法
shutdown
shutdown()
方法的作用是关闭线程池,不再接受新的任务,等待已经提交的任务执行完成
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查调用者是否有权限关闭执行器
checkShutdownAccess();
//将执行器的 `runState` 转换为 `SHUTDOWN` 状态。这将标记执行器不再接受新任务,但仍允许现有任务完成
advanceRunState(SHUTDOWN);
//中断任何正在等待分配新任务的空闲工作线程
interruptIdleWorkers();
//钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止执行器
tryTerminate();
}
checkShutdownAccess主要检查当前线程或者worker线程是否有new RuntimePermission("modifyThread")
权限
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
advanceRunState(SHUTDOWN)主要是设置SHUTDOWN状态
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptIdleWorkers中断空闲线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
onShutdown钩子函数,子类可以实现
void onShutdown() {
}
tryTerminate前文已经解读,不再赘述
shutdownNow()
立即关闭线程池,尝试中断所有正在执行的任务,并返回等待执行的所有任务的列表
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置为STOP
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
//将等待执行的任务从工作队列中移除并返回它们的列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
interruptWorkers中断所有线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
drainQueue将等待执行的任务从工作队列中移除并返回它们的列表
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
invokeAny
一个用于执行一组 Callable
任务并返回执行结果的方法
//不超时
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
//超时返回
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
doInvokeAny是具体执行逻辑,主要是选择其中一个任务执行
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//ExecutorCompletionService是 Java 中 Executor 框架的一个工具类,用于简化异步执行任务的结果处理过程。ExecutorCompletionService 将 Executor和 BlockingQueue两个接口结合在一起,提供了一个方便的方法来获取已经完成的任务的执行结果
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// 为了提高效率,特别是在并行度有限的执行器中,应该在提交更多任务之前先检查之前提交的任务是否已经完成。
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 提交一个任务
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
//取出任务执行结果
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
invokeAll
invokeAll会执行所有任务,并等待返回
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
下面这个是超时返回
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
常见线程池
java.util.concurrent.Executors
类提供了几种常见的线程池类型
newFixedThreadPool(int nThreads)
创建一个固定大小的线程池,该线程池中的线程数量始终为 nThreads
,如果所有线程都处于忙碌状态,则新任务需要等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool()
创建一个可缓存的线程池,该线程池中的线程数量不固定,如果有空闲的线程,则会立即用来执行新任务,否则会创建新的线程。在一段时间内没有任务执行时,该线程池中的线程会自动销毁。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor()
创建一个单线程的线程池,该线程池中只有一个线程,所有任务都按照先进先出的顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newScheduledThreadPool(int corePoolSize)
创建一个定时任务的线程池,该线程池中的线程数量为 corePoolSize
,可以执行定时任务和周期性任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
newWorkStealingPool(int parallelism)
创建一个工作窃取线程池,该线程池中的线程数量为 parallelism
,它会尽可能地利用多个 CPU 核心,将任务分配给空闲的线程来执行。如果一个线程正在执行一个任务,而其他线程已经执行完了自己的任务,那么这些空闲的线程就会从任务队列中窃取任务并执行。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
常见问题
为什么线程池不推荐使用Executors去创建线程池?
线程池不推荐使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors各个方法的弊端:
- newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
- newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
如何选择创建线程的个数?
选择线程池中线程的个数需要根据实际情况进行考虑,以下是一些常见的选择方式
- CPU 密集型任务:如果线程池中的任务都是 CPU 密集型任务,即任务执行过程中需要大量的 CPU 时间,那么线程的个数应该设置为 CPU 核心数加 1。这样可以充分利用 CPU 资源,同时避免线程数过多导致线程切换的开销
- IO 密集型任务:如果线程池中的任务都是 IO 密集型任务,即任务执行过程中需要大量的 IO 操作(如读写文件、读写数据库等),那么线程的个数应该设置如下
N = U * (1 + W/C)
其中,
N
表示线程池中线程的个数,U
表示 CPU 利用率,W
表示任务等待时间,C
表示任务执行时间
线程池为什么能维持线程不释放,随时运行各种任务?
主要是在getTask方法里面
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
在死循环中工作队列workQueue会一直去拿任务:
- 核心线程的会一直卡在 workQueue.take()方法,让线程一直等待,直到获取到任务,然后返回。
- 非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收。