今天继续看一个 promise 限制并发的工具
上源码 直接上 js 的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import Queue from 'yocto-queue' export default function pLimit (concurrency ) { if ( !( (Number .isInteger (concurrency) || concurrency === Number .POSITIVE_INFINITY ) && concurrency > 0 ) ) { throw new TypeError ('Expected `concurrency` to be a number from 1 and up' ) } const queue = new Queue () let activeCount = 0 const next = ( ) => { activeCount-- if (queue.size > 0 ) { queue.dequeue ()() } } const run = async (fn, resolve, args ) => { activeCount++ const result = (async () => fn (...args))() resolve (result) try { await result } catch {} next () } const enqueue = (fn, resolve, args ) => { queue.enqueue (run.bind (undefined , fn, resolve, args)) ;(async () => { await Promise .resolve () if (activeCount < concurrency && queue.size > 0 ) { queue.dequeue ()() } })() } const generator = (fn, ...args ) => new Promise ((resolve ) => { enqueue (fn, resolve, args) }) Object .defineProperties (generator, { activeCount : { get : () => activeCount, }, pendingCount : { get : () => queue.size , }, clearQueue : { value : () => { queue.clear () }, }, }) return generator }
讲解 首先展示一下使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pLimit from 'p-limit' const limit = pLimit (1 )const input = [ limit (() => fetchSomething ('foo' )), limit (() => fetchSomething ('bar' )), limit (() => doSomething ()), ] const result = await Promise .all (input)console .log (result)
首先是 pLimit(1),创建一个只能有一个 promise 并发的工具函数。
然后调用三次 limit(fn),将 3 个 fn 存入队列,直到本次循环完成(同步任务)
同步任务执行完成,开始执行:await Promise.resolve()
后的:
1 2 3 if (activeCount < concurrency && queue.size > 0 ) { queue.dequeue ()() }
首先是第一次limit(() => fetchSomething('foo'))
创建的 promise 微任务,这时候 activeCount 是 0 小于 concurrency 的 1,而且 queue.size 是 3,开始从 queue 取出第一个任务开始执行,activeCount++ 变成 1,并且开始 await 异步等待任务完成。
然后是 limit(() => fetchSomething('bar'))
创建的微任务,这时候因为 activeCount 是 1,不符合 activeCount < concurrency,不执行出列。第三个 limit 同理。
当 fetchSomething('foo')
异步等待完成,执行 next()
, activeCount– ,并且取出第二个任务开始执行,activeCount++,异步等待,等待完成、activeCount– …… 直到整个队列空了,执行完成。
所以简单总结一下就是,所有任务入列,等到下一个事件循环,拿出n个任务开始执行,每一个任务执行完成,继续从队列拿出一个任务进行执行,直到队列为空。