并发-AQS之ThreadPoolExecutor源码解读(二)

本文接着上文并发-AQS之ThreadPoolExecutor源码解读(一)继续解读源码

关键方法

shutdown

shutdown() 方法的作用是关闭线程池,不再接受新的任务,等待已经提交的任务执行完成

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;


    mainLock.lock();


    try {



        //检查调用者是否有权限关闭执行器
        checkShutdownAccess();

        //将执行器的 `runState` 转换为 `SHUTDOWN` 状态。这将标记执行器不再接受新任务,但仍允许现有任务完成
        advanceRunState(SHUTDOWN);
        //中断任何正在等待分配新任务的空闲工作线程
        interruptIdleWorkers();
        //钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {

        mainLock.unlock();

    }

    //尝试终止执行器
    tryTerminate();
}

checkShutdownAccess主要检查当前线程或者worker线程是否有new RuntimePermission("modifyThread")权限

private void checkShutdownAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkPermission(shutdownPerm);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                security.checkAccess(w.thread);
        } finally {
            mainLock.unlock();
        }

    }
}

advanceRunState(SHUTDOWN)主要是设置SHUTDOWN状态

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

interruptIdleWorkers中断空闲线程

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;


    mainLock.lock();


    try {



        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

onShutdown钩子函数,子类可以实现

void onShutdown() {
}

tryTerminate前文已经解读,不再赘述

shutdownNow()

立即关闭线程池,尝试中断所有正在执行的任务,并返回等待执行的所有任务的列表

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();

        //设置为STOP
        advanceRunState(STOP);
        //中断所有线程
        interruptWorkers();
        //将等待执行的任务从工作队列中移除并返回它们的列表
        tasks = drainQueue();
    } finally {

        mainLock.unlock();

    }

    tryTerminate();
    return tasks;
}

interruptWorkers中断所有线程

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;


    mainLock.lock();


    try {



        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }

}

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

drainQueue将等待执行的任务从工作队列中移除并返回它们的列表

private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

invokeAny

一个用于执行一组 Callable 任务并返回执行结果的方法

//不超时
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {



        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }

}

//超时返回
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

doInvokeAny是具体执行逻辑,主要是选择其中一个任务执行

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)

        throw new NullPointerException();

    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //ExecutorCompletionService是 Java 中 Executor 框架的一个工具类,用于简化异步执行任务的结果处理过程。ExecutorCompletionService 将 Executor和 BlockingQueue两个接口结合在一起,提供了一个方便的方法来获取已经完成的任务的执行结果
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    // 为了提高效率,特别是在并行度有限的执行器中,应该在提交更多任务之前先检查之前提交的任务是否已经完成。
    try {
        // Record exceptions so that if we fail to obtain any
        // result, we can throw the last exception we got.
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 提交一个任务
        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;

        for (;;) {
            //取出任务执行结果
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

invokeAll

invokeAll会执行所有任务,并等待返回

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }

        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

下面这个是超时返回

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)

        throw new NullPointerException();

    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

常见线程池

java.util.concurrent.Executors 类提供了几种常见的线程池类型

newFixedThreadPool(int nThreads)

创建一个固定大小的线程池,该线程池中的线程数量始终为 nThreads,如果所有线程都处于忙碌状态,则新任务需要等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newCachedThreadPool()

创建一个可缓存的线程池,该线程池中的线程数量不固定,如果有空闲的线程,则会立即用来执行新任务,否则会创建新的线程。在一段时间内没有任务执行时,该线程池中的线程会自动销毁。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}



newSingleThreadExecutor()

创建一个单线程的线程池,该线程池中只有一个线程,所有任务都按照先进先出的顺序执行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>()));
}



public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

newScheduledThreadPool(int corePoolSize)

创建一个定时任务的线程池,该线程池中的线程数量为 corePoolSize,可以执行定时任务和周期性任务。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

newWorkStealingPool(int parallelism)

创建一个工作窃取线程池,该线程池中的线程数量为 parallelism,它会尽可能地利用多个 CPU 核心,将任务分配给空闲的线程来执行。如果一个线程正在执行一个任务,而其他线程已经执行完了自己的任务,那么这些空闲的线程就会从任务队列中窃取任务并执行。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool

        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,

         null, true);

}



public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool

        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,

         null, true);

}



常见问题

为什么线程池不推荐使用Executors去创建线程池?

线程池不推荐使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors各个方法的弊端:

  • newFixedThreadPool和newSingleThreadExecutor:   主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • newCachedThreadPool和newScheduledThreadPool:   主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

如何选择创建线程的个数?

选择线程池中线程的个数需要根据实际情况进行考虑,以下是一些常见的选择方式

  • CPU 密集型任务:如果线程池中的任务都是 CPU 密集型任务,即任务执行过程中需要大量的 CPU 时间,那么线程的个数应该设置为 CPU 核心数加 1。这样可以充分利用 CPU 资源,同时避免线程数过多导致线程切换的开销
  • IO 密集型任务:如果线程池中的任务都是 IO 密集型任务,即任务执行过程中需要大量的 IO 操作(如读写文件、读写数据库等),那么线程的个数应该设置如下
    N = U * (1 + W/C)
    

    其中,N 表示线程池中线程的个数,U 表示 CPU 利用率,W 表示任务等待时间,C 表示任务执行时间

线程池为什么能维持线程不释放,随时运行各种任务?

主要是在getTask方法里面

...
try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
} catch (InterruptedException retry) {
    timedOut = false;
}

在死循环中工作队列workQueue会一直去拿任务:

  • 核心线程的会一直卡在 workQueue.take()方法,让线程一直等待,直到获取到任务,然后返回。
  • 非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY79Ty6e' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片