Android 多任务并发分合策略

前言

在开发一些复杂的业务逻辑涉及多线程环境的情况下,或者对代码做一些优化的时候,我们往往会涉及到一些多线程的策略“等待所有的线程任务执行完成后再进行下一步”,我将这种场景称为多任务的并发分合策略,或者多线程分合策略。这次就来简单聊聊有哪些方法能实现该策略。

正常做法

先写个Demo看正常情况下多线程的执行结果。

    private fun initThreadPool() {
        val threadPool =
            ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))

        Log.d("mmp", "========= task start")
        threadPool.execute {
            Log.d("mmp", "--------- A start")
            taskA()
            Log.d("mmp", "--------- A end")
        }
        threadPool.execute {
            Log.d("mmp", "--------- B start")
            taskB()
            Log.d("mmp", "--------- B end")
        }
        threadPool.execute {
            Log.d("mmp", "--------- C start")
            taskC()
            Log.d("mmp", "--------- C end")
        }
        Log.d("mmp", "========= task end")
    }

    private fun taskA() {
        Thread.sleep(1000)
    }

    private fun taskB() {
        Thread.sleep(2000)
    }

    private fun taskC() {
        Thread.sleep(3000)
    }

可以看到结果

image.png

这是一个正常情况下多线程分散执行的结果,要是想让任务有序,那就是执行完一个请求之后再回调中再执行另一个请求。

        Log.d("mmp", "========= task start")
        threadPool.execute {
            Log.d("mmp", "--------- A start")
            taskA()
            Log.d("mmp", "--------- A end")
            threadPool.execute {
                Log.d("mmp", "--------- B start")
                taskB()
                Log.d("mmp", "--------- B end")
                threadPool.execute {
                    Log.d("mmp", "--------- C start")
                    taskC()
                    Log.d("mmp", "--------- C end")
                    Log.d("mmp", "========= task end")
                }
            }

        }

image.png

这样就能按照顺序去执行任务,但是这种就不是一个并发的效果。

多线程并发分合场景

现在讲一种开发时会经常用到的多任务情况下的开发策略。实现并发的情况下并且在所有并发的任务执行完之后再统一做操作。

现在讲一种开发时会经常用到的多任务情况下的开发策略。实现并发的情况下并且在所有并发的任务执行完之后再统一做操作。

比如说我有这么一个需求,我有一个复杂的页面,因为MVVM的设计去拆分子View,每个View有自己的情况,我这个页面要进行3个不同的网络请求拿到页面数据,等全部拿到之后显示在页面后,再去请求一个弹窗的流程接口。可能举这个例子不是太好,反正意思就是等3次请求完成之后,再去执行第四次请求,这样说可能会比较容易解释这个场景。

因为大项目中,如果是一些业务逻辑,或者说优化点,就会用到这样的开发策略,因为我这边是不能直接拿我自己的项目的场景来举例,所以就随便想了上面的这个例子。小项目可能不会碰到这种场景,因为小项目连多线程的场景都少。这个模型大概的执行流程是这样。

image.png

这个策略我找不到一个具体的说法,表述上来说大致就是“并发处理任务,并等所有任务完成之后再进行下一步”,我称为为多任务的并发分合策略,也可以叫多线程分合策略

1. 使用计数器

ok,要实现这个策略,其实方法很多,首先可以用一个计数器来实现吧,我每执行完一次任务,计数器就+1,当计数器达到任务数量时再进行下一步。

把上边的Demo代码改成这样

    private val counter : AtomicInteger = AtomicInteger(0)


    private fun initThreadPool() {

        val threadPool =

            ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))



        Log.d("mmp", "========= task start")

        threadPool.execute {

            Log.d("mmp", "--------- A start")

            taskA()

            Log.d("mmp", "--------- A end")

            counter.incrementAndGet()
            nextTask()
        }
        threadPool.execute {
            Log.d("mmp", "--------- B start")
            taskB()
            Log.d("mmp", "--------- B end")
            counter.incrementAndGet()
            nextTask()
        }
        threadPool.execute {
            Log.d("mmp", "--------- C start")
            taskC()
            Log.d("mmp", "--------- C end")
            counter.incrementAndGet()
            nextTask()
        }
    }

    private fun nextTask(){
        if (counter.get() == 3){
            Log.d("mmp", "========= task end")
        }
    }

image.png

可以看到这样就能达到我们的效果了,但是好像不太优雅,而且是不是感觉有点奇怪,奇怪在于“每个任务执行完之后都会调用这个方法,然后由方法内部做判断”,虽然没问题,但是缺少优雅。

2. CountDownLatch

而对于这样的方案,其实java有提供一个类CountDownLatch,可以使用CountDownLatch直接实现我们想要的效果。

    private val counter = CountDownLatch(3)


    private fun initThreadPool() {

        val threadPool =

            ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))



        Log.d("mmp", "========= task start")

        threadPool.execute {

            Log.d("mmp", "--------- A start")

            taskA()

            Log.d("mmp", "--------- A end")

            counter.countDown()
        }
        threadPool.execute {
            Log.d("mmp", "--------- B start")
            taskB()
            Log.d("mmp", "--------- B end")
            counter.countDown()
        }
        threadPool.execute {
            Log.d("mmp", "--------- C start")
            taskC()
            Log.d("mmp", "--------- C end")
            counter.countDown()
        }
        counter.await()
        Log.d("mmp", "========= task end")
    }

image.png

CountDownLatch为什么能实现这个效果呢?它内部其实是基于AQS,有兴趣的可以看看我以前写的这篇文章 juejin.cn/post/716801…

如果你想快速实现多任务的并发分合策略,CountDownLatch会是一个不错的选择,而且它是官方帮你封装好的,质量肯定是有保障的。

3. wait/notify

但是我们不能只会使用官方的东西,这样会禁锢我们的思维,要知道,那些牛逼的框架,那些主流的框架,就是做到了官方想不到的事情。所以既然提到AQS,那又能联想到另外一个模型“等待通知范式”,用它也能来实现这个效果。

这个模型大概是这样的

    synchronized (object){
        while(条件不满足){
            object.wait();
        }
        对应的处理逻辑
    }
    
    synchronized (object){
        改变条件
        object.notifyAll();
    }

那运用在这个策略上,大概的写法可以是这样

    private var counter: Int = 0
    private val lock = java.lang.Object()

    private fun initThreadPool() {
        val threadPool =
            ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, ArrayBlockingQueue<Runnable>(10))


        Log.d("mmp", "========= task start")
        threadPool.execute {
            Log.d("mmp", "--------- A start")
            taskA()
            Log.d("mmp", "--------- A end")
            synchronized(lock) {
                counter++
                lock.notifyAll()
            }

        }

        threadPool.execute {
            Log.d("mmp", "--------- B start")
            taskB()
            Log.d("mmp", "--------- B end")
            synchronized(lock) {
                counter++
                lock.notifyAll()
            }
        }
        threadPool.execute {
            Log.d("mmp", "--------- C start")
            taskC()
            Log.d("mmp", "--------- C end")
            synchronized(lock) {
                counter++
                lock.notifyAll()
            }
        }

        synchronized(lock) {
            while (counter < 3) {
                lock.wait()
            }
            Log.d("mmp", "========= task end")
        }

    }

image.png

看到是能实现这么一个效果,但是效果好像和计数器一样,是的,有一部分的解题思路是用了计数器,但是另一半的思路,第一个例子是每个任务执行完成后都调用一个方法,这个例子是通过wait/notify。既然如此,这里扩展一下提出一个问题,为什么第一个例子中counter对象用了AtomicInteger而这里直接用Int?

当然注意一下,主线程中可不能直接这样玩。

4. 协程

ok,Java的东西玩完了,我们来玩一点kotlin的东西,我想都不用想,我就觉得协程应该是能快速的实现这样的策略。把代码改成使用协程的方式。

        lifecycle.coroutineScope.launch {
            Log.d("mmp", "========= task start")
            val joinA = GlobalScope.launch{
                Log.d("mmp", "--------- A start")
                taskA()
                Log.d("mmp", "--------- A end")
            }
            val joinB = GlobalScope.launch{
                Log.d("mmp", "--------- B start")
                taskB()
                Log.d("mmp", "--------- B end")
            }
            val joinC = GlobalScope.launch{
                Log.d("mmp", "--------- C start")
                taskC()
                Log.d("mmp", "--------- C end")
            }
            joinA.join()
            joinB.join()
            joinC.join()
            Log.d("mmp", "========= task end")

image.png

可以看到使用协程来实现还是很方便的。

5. flow

既然使用协程能实现这个效果,那同样的使用flow肯定也能实现。 声明一下,flow用得不多,我这个写法可能不太好,只是用于演示,如果各位大佬有比较好的写法,也可以分享让我学习一下。

private fun initThreadPool() {
    runBlocking {
        Log.d("mmp", "========= task start")
        val actionA = flow {
            emit(taskA())
        }.flowOn(Dispatchers.IO)


        val actionB = flow {
            emit(taskB())
        }.flowOn(Dispatchers.IO)

        val actionC = flow {
            emit(taskC())
        }.flowOn(Dispatchers.IO)

        actionA.zip(actionB){_,_ ->

        }.zip(actionC){_,_ ->

        }.collect{
            Log.d("mmp", "========= task end")

        }
    }

}

private fun taskA() {
    Log.d("mmp", "--------- A start")
    Thread.sleep(1000)
    Log.d("mmp", "--------- A end")
}

private fun taskB() {
    Log.d("mmp", "--------- B start")
    Thread.sleep(2000)
    Log.d("mmp", "--------- B end")
}

private fun taskC() {
    Log.d("mmp", "--------- C start")
    Thread.sleep(3000)
    Log.d("mmp", "--------- C end")
}

image.png

ok,这样就是用flow实现这个效果,当然这写法是有点不美观,我是肯定相信flow能有更美观的写法来实现这个效果,主要我用得不多,所以这里就献丑了。当然既然flow能实现的话,rxjava等方法也同样能实现这个效果,这里就不再演示了。

总结

总结一下,其实这里主要是拿多任务并发的一个场景进行分析举例。我们平常的开发中,是有一些比较好的策略、思考去解决一些问题,可以是实现某些功能,也可以是做某些优化,因为我最近看文章的话会觉得写一些开发策略、思维的文章会比较少。我觉得这对开发来说也是比较重要的一个东西,先有了一些处理问题的思路之后(这很重要),然后你可以围绕这个思路,有很多种方式去实现。

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYGTs8OQ' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片