并发控制器
面试题
实现一个并发控制器,限制同一时间最多执行 limit 个异步任务
面试官视角
这题主要考 Promise 调度能力:
- 能不能维护正在执行的任务数
- 能不能在任务结束后继续补位执行下一个任务
- 能不能保持结果顺序和输入顺序一致
- 单个任务失败后怎么处理
- 有没有考虑边界:空任务、limit 非法、同步抛错
场景
比如有 100 个请求,但浏览器或服务端不能同时打满,需要最多同时执行 3 个:
const tasks = [
() => fetch('/api/1'),
() => fetch('/api/2'),
() => fetch('/api/3')
]
promisePool(tasks, 2).then(console.log)核心思路
用一个游标 index 指向下一个待执行任务,用 running 记录当前正在执行的数量
- 创建结果数组
results,长度和任务数组一致 - 每次调度时,只要
running < limit且还有任务,就启动新任务 - 启动任务前记录当前任务下标,保证结果能放回原位置
- 任务完成后
running--,再继续调度下一个任务 - 当所有任务都完成时,
resolve(results) - 如果任一任务失败,直接
reject(error),行为对齐Promise.all
Promise.all 风格实现
function promisePool(tasks, limit) {
return new Promise((resolve, reject) => {
if (!Array.isArray(tasks)) {
reject(new TypeError('tasks must be an array'))
return
}
if (tasks.length === 0) {
resolve([])
return
}
limit = Math.max(1, Math.floor(limit))
const results = new Array(tasks.length)
let nextIndex = 0
let running = 0
let finished = 0
let rejected = false
function runNext() {
if (rejected) {
return
}
if (finished === tasks.length) {
resolve(results)
return
}
while (running < limit && nextIndex < tasks.length) {
const currentIndex = nextIndex++
const task = tasks[currentIndex]
running++
Promise.resolve()
.then(() => task())
.then((value) => {
results[currentIndex] = value
running--
finished++
runNext()
})
.catch((error) => {
rejected = true
reject(error)
})
}
}
runNext()
})
}async/await 简洁版
这个版本更适合面试现场写,逻辑也比较清楚
async function promisePool(tasks, limit) {
const results = new Array(tasks.length)
let nextIndex = 0
async function worker() {
while (nextIndex < tasks.length) {
const currentIndex = nextIndex++
results[currentIndex] = await tasks[currentIndex]()
}
}
const workers = Array.from(
{ length: Math.min(limit, tasks.length) },
() => worker()
)
await Promise.all(workers)
return results
}注意
nextIndex++ 在 JS 单线程事件循环里是同步执行的,所以不会出现多个 worker 抢到同一个下标的问题
Scheduler 类版本
有些面试会让你实现:
const scheduler = new Scheduler(2)
scheduler.add(task1).then(console.log)
scheduler.add(task2).then(console.log)可以用队列保存待执行任务:
class Scheduler {
constructor(limit) {
this.limit = limit
this.running = 0
this.queue = []
}
add(task) {
return new Promise((resolve, reject) => {
this.queue.push({
task,
resolve,
reject
})
this.run()
})
}
run() {
while (this.running < this.limit && this.queue.length > 0) {
const { task, resolve, reject } = this.queue.shift()
this.running++
Promise.resolve()
.then(task)
.then(resolve, reject)
.finally(() => {
this.running--
this.run()
})
}
}
}验证用例
const delay = (time, value) => {
return () => new Promise((resolve) => {
setTimeout(() => {
console.log('done', value)
resolve(value)
}, time)
})
}
promisePool([
delay(1000, 'a'),
delay(500, 'b'),
delay(300, 'c'),
delay(400, 'd')
], 2).then((res) => {
console.log(res) // ['a', 'b', 'c', 'd']
})面试回答模板
我会用一个队列或下标保存待执行任务,用一个计数器记录当前执行中的任务数。每次启动任务前让计数器加一,任务结束后减一,并立即补充下一个任务。结果数组按任务原始下标写入,这样即使任务完成顺序不同,最终返回顺序也能和输入一致
易错点
- 不能一开始就执行所有任务,否则就没有并发控制了;任务必须包装成函数
- 结果顺序要按输入顺序,而不是完成顺序
- 任务同步抛错也要能被捕获,所以用
Promise.resolve().then(task) finally里一定要释放并发名额并继续调度limit要做兜底,至少为 1
