【源码共读】第31期 | p-limit 限制并发数

1. 前言

2. 基础知识

2.1 Object.defineProperty

Object.defineProperty是为对象添加属性,具体写法Object.defineProperty(obj, prop, descriptor)参考
MDN官网

参数

  • obj

    要定义属性的对象。

  • prop

    一个字符串或 Symbol,指定了要定义或修改的属性键。

  • descriptor

    要定义或修改的属性的描述符。

返回值

传入函数的对象,其指定的属性已被添加或修改。


Object.defineProperty(obj, "name", {
    enumerable: false,// 可枚举
    configurable: false,// 不可删除
    writable: false, // 可修改
    value: "jack", // 值
    get(){ //取值器
            return this.value
    },
    set(value){ //存值器
            this.value = value
    }
});



2.2 异步事件处理

写一个简单案例,有助于理解源码中enqueue函数的写法。

案例:
定义一个queue队列,调用enqueue添加数量,在run调用后打印queue的长度。

const queue = [1, 2, 3];

 function run() {
    const promise = new Promise((resolve) => {
        resolve(4);
    });
    promise.then(result=>{
        queue.push(result);
    })
};



function enqueue() {
    run();
    (function () {
         Promise.resolve().then(()=>{
             console.log(queue.length);
         });
    })();
    console.log(queue.length);
};

enqueue();

借助可视化网站jsv9000运行,上述代码没有使用async,因为网站不支持。

运行流程:

  1. 执行enqueue函数,将enqueue压入调用栈
  2. 运行run函数,将run压入调用栈
  3. 执行new Promise的回调压入调用栈
  4. 遇到.then将其放入微任务队列

image.png

  1. new Promise的回调、run函数依次出栈
  2. 此时遇到enqueue内部的匿名函数执行,压入栈
  3. 遇到Promise.resolve.then,将其放在微任务队列中
  4. 打印console.log(queue.length) 此时queue依然为3
  5. 等enqueue函数执行完成,enqueue出栈
  6. 执行微任务队列,先执行run中的微任务,后执行enqueue中的微任务
  7. 打印结果为4
    image.png

这里借助立即执行函数,调用Promise.resolve.then去等待上一个微任务执行完毕,获取最新的queue长度。

2.3 Array.from的用处

测试用例中使用

Array.from({length: 5}, () => 1);

实际上创建了长度为5的数组,且值为1

image.png

如果用async修饰

Array.from({length: 5}, async() => 1);

image.png

2.4

3. 源码解析

3.1 使用流程

通过测试用例可以看出使用流程:

1.引入p-limit,初始化最大并发数,返回值limit为一个函数

2.对每一请求都调用limit,组成数组input

4.使用Promise.all(input)调用

3.2 运行流程

1. 定义并初始化

if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
        throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}




const queue = new Queue();
let activeCount = 0;

  • concurrency 参数表示最大并发数,需为大于0的整数或正无穷大。
  • queue 是使用 yocto-queue 库创建的队列实例,用于存储待执行的任务。参考前一个文章
  • activeCount 表示当前正在执行的任务数量,初始值为0。

2. generator函数

const generator = (fn, ...args) => new Promise(resolve => {
    enqueue(fn, resolve, args);
});
  • generator 函数用于创建一个新的 Promise 对象,并将任务函数及其参数传递给 enqueue 函数。
  • 创建 Promise 对象时,将任务函数和参数传递给 enqueue 函数,而不直接执行任务函数。
  • 返回的 Promise 对象将在任务执行完成后被 resolve。

3.使用 Object.defineProperties()generator定义属性和方法

Object.defineProperties(generator, {
    activeCount: {
        get: () => activeCount,
    },
    pendingCount: {
        get: () => queue.size,
    },
    clearQueue: {
        value: () => {
            queue.clear();
        },
    },
});

  • Object.defineProperties() 用于定义 generator 函数的属性和方法。
  • activeCount 属性用于获取当前正在执行的任务数量。
  • pendingCount 属性用于获取队列中等待执行的任务数量。
  • clearQueue 方法用于清空队列中的所有任务。

4. enqueue函数

const enqueue = (fn, resolve, args) => {
    queue.enqueue(run.bind(undefined, fn, resolve, args));


    (async () => {
        await Promise.resolve();
        if (activeCount < concurrency && queue.size > 0) {
            queue.dequeue()();
        }
    })();
};



  • enqueue 函数用于将任务加入到队列中进行调度。
  • 首先,将任务函数及其参数使用 run.bind() 绑定到队列的 enqueue() 方法上,以将执行任务的逻辑加入到队列中。
  • 然后,通过异步函数立即执行Promise.resolve()。
  • 在下一个微任务(microtask)时刻,比较 activeCountconcurrency 的值,检查是否可以继续执行下一个任务。此处参考基础知识2.2。
  • 如果 activeCount 小于 concurrency 并且队列中还有待执行的任务,就调用 queue.dequeue() 从队列中取出并执行任务。

5. run函数

const run = async (fn, resolve, args) => {
    activeCount++;
    const result = (async () => fn(...args))();



    resolve(result);


    try {
        await result;
    } catch {}

    next();
};


  • run 函数用于执行具体的任务。
  • 在执行任务前,先将 activeCount 加1。
  • 使用传入的参数 fnargs 执行任务函数,并将返回的 Promise 对象赋值给 result 变量。
  • 通过传入的 resolve 参数将结果 resolve 给外部使用。
  • 在任务执行完成后,将 activeCount 减1,并调用 next 函数处理下一个任务。

6. next函数

const next = () => {
    activeCount--;


    if (queue.size > 0) {
        queue.dequeue()();
    }
};
  • 当任务执行完成后,会调用 next 函数来处理下一个任务。
  • next 函数将 activeCount 减1,并检查队列中是否还有待执行的任务。
  • 如果队列中有待执行的任务,则调用 queue.dequeue() 从队列中取出任务并执行。

3.3 流程调试

test('concurrency: 4', async t => {
	const concurrency = 2;
	let running = 0;



	const limit = pLimit(concurrency);


	const input = Array.from({length: 5}, () => limit(async () => {
		running++;
		t.true(running <= concurrency);
		await delay(randomInt(30, 200));
		running--;
	}));


	await Promise.all(input);
});

初始化时 调用,将limit中包裹的回调传入generator,generator调用enqueue,使用bind将run函数包装,返回新的函数放入queue中
image.png

每次调用enqueue产生一个异步微任务

image.png

等input执行完毕,input装入的是5个generator返回的promise

image.png

使用promise.all(input)调用,进入异步微任务dequeue

image.png

进入run函数

image.png

执行外部回调 返回一个promise

image.png

image.png

注意此时 使用await 会将result放入微任务中

image.png

继续执行第二个微任务,调用dequeue() 将第二个result放入微任务中

image.png

继续执行 当activeCount等于2,执行3,4,5不会调用dequeue,因为并发数限制

if (activeCount < concurrency && queue.size > 0) {
    queue.dequeue()();
}

会直接result1微任务,此时调用next方法

image.png

image.png

如果size有,直接从队列拿出来执行,直到没有。

4. 总结

通过p-limit主要学习到异步任务执行的方式和并发的思想。结合调试过程加深对p-limit执行过程的理解。异步调试过程不像想象中的那么顺利,要有一定的耐心。总之千里之行,始于足下,加油O^O!


参考文章

Node.js 并发能力总结

NewName-【若川视野 x 源码共读】第31期 | p-limit 限制并发数

文章2

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYq9vvDy' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片