1. 前言
-
本文参加了由公众号@若川视野 发起的每周源码共读活动, 点击了解详情一起参与。
-
这是源码共读的第31期,链接:p-limit 限制并发数
2. 基础知识
2.1 Object.defineProperty
Object.defineProperty是为对象添加属性,具体写法Object.defineProperty(obj, prop, descriptor)参考
MDN官网。
参数
-
obj
要定义属性的对象。
-
prop
一个字符串或
Symbol
,指定了要定义或修改的属性键。 -
descriptor
要定义或修改的属性的描述符。
返回值
传入函数的对象,其指定的属性已被添加或修改。
Object.defineProperty(obj, "name", {
enumerable: false,// 可枚举
configurable: false,// 不可删除
writable: false, // 可修改
value: "jack", // 值
get(){ //取值器
return this.value
},
set(value){ //存值器
this.value = value
}
});
2.2 异步事件处理
写一个简单案例,有助于理解源码中enqueue函数的写法。
案例:
定义一个queue队列,调用enqueue添加数量,在run调用后打印queue的长度。
const queue = [1, 2, 3];
function run() {
const promise = new Promise((resolve) => {
resolve(4);
});
promise.then(result=>{
queue.push(result);
})
};
function enqueue() {
run();
(function () {
Promise.resolve().then(()=>{
console.log(queue.length);
});
})();
console.log(queue.length);
};
enqueue();
借助可视化网站jsv9000运行,上述代码没有使用async,因为网站不支持。
运行流程:
- 执行enqueue函数,将enqueue压入调用栈
- 运行run函数,将run压入调用栈
- 执行new Promise的回调压入调用栈
- 遇到
.then
将其放入微任务队列
- new Promise的回调、run函数依次出栈
- 此时遇到enqueue内部的匿名函数执行,压入栈
- 遇到Promise.resolve.then,将其放在微任务队列中
- 打印console.log(queue.length) 此时queue依然为3
- 等enqueue函数执行完成,enqueue出栈
- 执行微任务队列,先执行run中的微任务,后执行enqueue中的微任务
- 打印结果为4
这里借助立即执行函数,调用Promise.resolve.then去等待上一个微任务执行完毕,获取最新的queue长度。
2.3 Array.from的用处
测试用例中使用
Array.from({length: 5}, () => 1);
实际上创建了长度为5的数组,且值为1
如果用async修饰
Array.from({length: 5}, async() => 1);
2.4
3. 源码解析
3.1 使用流程
通过测试用例可以看出使用流程:
1.引入p-limit,初始化最大并发数,返回值limit为一个函数
2.对每一请求都调用limit,组成数组input
4.使用Promise.all(input)调用
3.2 运行流程
1. 定义并初始化
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
concurrency
参数表示最大并发数,需为大于0的整数或正无穷大。queue
是使用 yocto-queue 库创建的队列实例,用于存储待执行的任务。参考前一个文章activeCount
表示当前正在执行的任务数量,初始值为0。
2. generator
函数
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
generator
函数用于创建一个新的 Promise 对象,并将任务函数及其参数传递给enqueue
函数。- 创建 Promise 对象时,将任务函数和参数传递给
enqueue
函数,而不直接执行任务函数。 - 返回的 Promise 对象将在任务执行完成后被 resolve。
3.使用 Object.defineProperties()
为generator
定义属性和方法
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
Object.defineProperties()
用于定义generator
函数的属性和方法。activeCount
属性用于获取当前正在执行的任务数量。pendingCount
属性用于获取队列中等待执行的任务数量。clearQueue
方法用于清空队列中的所有任务。
4. enqueue
函数
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
enqueue
函数用于将任务加入到队列中进行调度。- 首先,将任务函数及其参数使用
run.bind()
绑定到队列的enqueue()
方法上,以将执行任务的逻辑加入到队列中。 - 然后,通过异步函数立即执行Promise.resolve()。
- 在下一个微任务(microtask)时刻,比较
activeCount
和concurrency
的值,检查是否可以继续执行下一个任务。此处参考基础知识2.2。 - 如果
activeCount
小于concurrency
并且队列中还有待执行的任务,就调用queue.dequeue()
从队列中取出并执行任务。
5. run
函数
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
run
函数用于执行具体的任务。- 在执行任务前,先将
activeCount
加1。 - 使用传入的参数
fn
和args
执行任务函数,并将返回的 Promise 对象赋值给result
变量。 - 通过传入的
resolve
参数将结果 resolve 给外部使用。 - 在任务执行完成后,将
activeCount
减1,并调用next
函数处理下一个任务。
6. next
函数
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
- 当任务执行完成后,会调用
next
函数来处理下一个任务。 next
函数将activeCount
减1,并检查队列中是否还有待执行的任务。- 如果队列中有待执行的任务,则调用
queue.dequeue()
从队列中取出任务并执行。
3.3 流程调试
test('concurrency: 4', async t => {
const concurrency = 2;
let running = 0;
const limit = pLimit(concurrency);
const input = Array.from({length: 5}, () => limit(async () => {
running++;
t.true(running <= concurrency);
await delay(randomInt(30, 200));
running--;
}));
await Promise.all(input);
});
初始化时 调用,将limit中包裹的回调传入generator,generator调用enqueue,使用bind将run函数包装,返回新的函数放入queue中
每次调用enqueue产生一个异步微任务
等input执行完毕,input装入的是5个generator返回的promise
使用promise.all(input)调用,进入异步微任务dequeue
进入run函数
执行外部回调 返回一个promise
注意此时 使用await 会将result放入微任务中
继续执行第二个微任务,调用dequeue() 将第二个result放入微任务中
继续执行 当activeCount等于2,执行3,4,5不会调用dequeue,因为并发数限制
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
会直接result1微任务,此时调用next方法
如果size有,直接从队列拿出来执行,直到没有。
4. 总结
通过p-limit主要学习到异步任务执行的方式和并发的思想。结合调试过程加深对p-limit执行过程的理解。异步调试过程不像想象中的那么顺利,要有一定的耐心。总之千里之行,始于足下,加油O^O!
参考文章