发布于

如何实现一个 Promise 池

作者
  • avatar
    姓名
    Jacob
    Twitter

一、背景

最近在写一个批量上传的 node 脚本,其实大家可以想到批量上传带来的接口性能问题,如果将需要上传的文件通过多个异步请求同时上传的话,服务器方面应该是吃不消的,所以这里提出一个 promise 池的概念。

其原理就是固定每次运行的 promise 函数个数,其中有一个完成之后再继续再向这个池子中新增一个 promise 函数运行,这样就可以保持一直有固定个数的 promise 函数在运行,并且服务端的压力不会太大。

二、实现

具体的实现就像是接力跑步,比如我们规定最多同时执行 5 个异步函数。

赛道上只能有 5 个运动员在跑步,但是要让所有的运动员都跑一次,最好的方法就是 5 个运动员分别在 5 个赛道上跑,谁先跑到终点就将接力棒传递给下一个运动员,这样就能使每个运动员跑一次并且赛道上最多有 5 个运动员。

当然,这里只是说跑到终点会将接力棒传给下一个运动员,但是如果一个运行员跑步过程中受伤了,不能继续完成跑步的话就要放弃,并将接力棒传递给下一个运动员,也就是 promise 函数中的 reject。

还有一个点就是如何监听所有的 promise 函数已经结束,我们在每个 promise 函数运行结束之后都要运行判断一下当前正在执行的函数个数以及还没有运行的函数个数是否为 0,为 0 的话则表示函数运行结束。这里我们使用 EventEmitter 来发送结束消息,表示所有 promise 函数已经执行完成。

具体的实现代码如下:

class PromisePool extends EventEmitter {
  constructor(params, fn, max) {
    super()
    this.params = params // 异步函数的参数数组
    this.fn = fn // 异步函数
    this.max = max // 同时运行的最大数量
    this.pool = [] // 存放task
    this.successNum = 0 // 成功个数
    this.failNum = 0 // 失败个数
    this.count = params.length // 当前正在运行的异步函数个数
    this.init()
  }
  // 初始化函数
  init() {
    while (this.count < this.max) {
      const param = this.params.shift()
      if (!param) {
        break
      }
      this.genTask(param)
    }
  }
  genTask(param) {
    this.count++
    const task = this.fn(param)
    task
      .then(() => {
        this.successNum++
      })
      .catch(() => {
        this.failNum++
      })
      .finally(() => {
        this.count--
        this.init()
        if (this.count === 0 && this.params.length === 0) {
          this.emit('finish')
        }
      })
  }
}