EventHelper 异步并发处理
发布于 7 年前 作者 yvanwangl 4013 次浏览 来自 分享

前两天在掘金上看到一篇博客,《Promise 异步流程控制》,作者首先抛出一个如下的应用场景:

网页中预加载20张图片资源,分步加载,一次加载10张,两次完成,怎么控制图片请求的并发,怎样感知当前异步请求是否已完成? 然后作者从 单一请求—>并发请求—>并发请求,按顺序处理结果—>控制最大并发数 这么几个小节逐步阐述了对上述应用场景的Promise解决方案,可以看出作者功底相当深厚,深入浅出逐步阐述了运用Promise如何控制异步流程,具体内容大家可以访问上面的链接,必定会有所获益。

这时就有了想法,如果用 EventHelper这个工具库该如何解决上述问题呢?

首先,我们先了解一下 EventHelper 是什么?EventHelper 是一个基于事件机制的异步事件处理工具。EventHelper 只做一件事,就是将使用者从异步回调的地狱中解脱出来!既然是一个异步事件处理工具那么用来控制异步流程应该是专业对口的。

所以就封装了一个 concurrent 方法,专门用于异步并发队列的处理,代码如下:

EventHelper.prototype.group = function (eventType, handler) {
    let that = this;
    that._groupIndex[eventType] = that._groupIndex[eventType] || { index: 0 };
    let index = that._groupIndex[eventType]['index'];
    that._groupIndex[eventType]['index']++;
    return function (err, data) {
        if (err) {
            // put all arguments to the error handler
            debug(`group for event: ${eventType}; item-${index} has an error: ${err.message}`);
            err.index = index;
            //emit eventType with data: undefined
            that.emit(eventType, {
                type: 'group',
                index: index,
                // callback(err, args1, args2, ...)
                result: undefined
            });
            return that.emit.apply(that, ['error', eventType, err]);
        }
        that.emit(eventType, {
            type: 'group',
            index: index,
            // callback(err, args1, args2, ...)
            result: handler ? handler.apply(null, data) : data
        });
    };
};
EventHelper.prototype.concurrent = function(eventType, limit, asyncHandler, asyncParams){    
    let that = this;
    let asyncParamsClone = [...asyncParams];
    let queue = asyncParamsClone.splice(0, limit).map((param) => asyncHandler(param, that.group(eventType)));
    let indexMap = {};
    let resultArr = [];
    let dataIndex, nextParam, indexMapKey, indexMapValue, handler, executeCount=0;
    [...new Array(limit)].map((item, index)=> indexMap[index] = index);
    handler = (data)=> {
        if(typeof data == 'object' && data.type == 'group' && data.hasOwnProperty('index')){
            executeCount++;
            dataIndex = data.index;
            //if asyncParamsClone's length does not equals to 0, then fill the queue
            if(asyncParamsClone.length!=0){
                nextParam = asyncParamsClone.shift();
                indexMapValue = indexMap[dataIndex];
                queue.splice(indexMapValue, 1, asyncHandler(nextParam, that.group(eventType)));
                indexMapKey = asyncParams.findIndex(param=> nextParam == param);
                indexMap[indexMapKey] = indexMapValue;
            }
            resultArr[dataIndex] = data.result;
            //execute count equal asyncParams length, then fire `${eventType}Finish`
            if(executeCount == asyncParams.length){
                that.emit(`${eventType}Finish`, resultArr);
            }
        }
    };
    return that.on(eventType, handler);
};

经过封装后的 concurrent 方法接受 4 个参数即可事件异步队列的并发控制。传递参数:eventType为自定义异步事件名称;limit为异步并发数;asyncHandler为异步事件处理函数,注意该函数需要接收一个error first风格的回调函数作为参数,该回调函数为concurrent内部自动生成,用于接收异步事件处理成功后的数据或错误信息,例如上面例子中的loadImg函数;asyncParams 为异步事件队列的参数集合。该方法调用成功后,会触发一个 eventType+‘Finish’ 事件,通过该事件即可监听并发完成的事件,例如上例中的 load 事件完成后会出发 loadFinish 事件, 该事件注册的监听函数的参数为一个数组,即异步事件队列的结果集合,结果顺序与asyncParams 参数集合的顺序一致。

最终的应用代码为:

let images = [
	"https://www.baidu.com/img/bd_logo1.png", 
        "https://www.baidu.com/img/baidu_jgylogo3.gif",
        "https://ss1.baidu.com/6ONXsjip0QIZ8tyhnq/it/u=265704898,674087460&fm=58", 
        "https://ss0.baidu.com/6ONWsjip0QIZ8tyhnq/it/u=3105518379,1723849651&fm=58", 
        "https://ss0.baidu.com/6ONWsjip0QIZ8tyhnq/it/u=2292095202,1784829557&fm=58", 
        "https://ss0.baidu.com/6ONWsjip0QIZ8tyhnq/it/u=1219708893,1812014204&fm=5832222",
        "https://ss0.baidu.com/6ONWsjip0QIZ8tyhnq/it/u=3669175269,2356096679&fm=58123122", 
        "https://ss3.baidu.com/-rVXeDTa2gU2pMbgoY3K/it/u=154063165,2016512383&fm=202&mola=new&crop=v1", 
        "https://ss3.bdstatic.com/70cFv8Sh_Q1YnxGkpoWK1HF6hhy/it/u=3536541845,739399450&fm=27&gp=0.jpg", 
        "https://ss1.bdstatic.com/70cFvXSh_Q1YnxGkpoWK1HF6hhy/it/u=594559231,2167829292&fm=27&gp=0.jpg", 
        "https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=3138365389,851751545&fm=27&gp=0.jpg", 
        "https://ss2.bdstatic.com/70cFvnSh_Q1YnxGkpoWK1HF6hhy/it/u=3965705221,2010595691&fm=27&gp=0.jpg", 
        "https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=1742626185,2547278809&fm=27&gp=0.jpg", 
        "https://ss3.bdstatic.com/70cFv8Sh_Q1YnxGkpoWK1HF6hhy/it/u=1060387669,1498970204&fm=27&gp=0.jpg", 
        "https://ss3.bdstatic.com/70cFv8Sh_Q1YnxGkpoWK1HF6hhy/it/u=4271053251,2424464488&fm=27&gp=0.jpg", 
        "https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=4140230371,1584894266&fm=27&gp=0.jpg", 
        "https://ss1.bdstatic.com/70cFvXSh_Q1YnxGkpoWK1HF6hhy/it/u=2710025438,2030209695&fm=27&gp=0.jpg", 
        "https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=3432487329,2901563519&fm=27&gp=0.jpg", 
        "https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=2993396273,3023277058&fm=27&gp=0.jpg", 
        "https://ss2.bdstatic.com/70cFvnSh_Q1YnxGkpoWK1HF6hhy/it/u=2838934065,571280381&fm=27&gp=0.jpg"
];
let loadImg = (url, callback)=>{
    let img = document.createElement('img');
    img.onload = ()=> callback(null, img);
    img.onerror = (error)=> callback(error, null);
    img.src = url;
};
emmiter.fail({
    load: function(err){
        console.log(err);
    }
});
emmiter.concurrent('load', 5, loadImg, images);
emmiter.on('loadFinish', (result)=> {
    console.log(result);
});

由于该方法的实现需要借助 EventHelper 的内部方法 group 所以讲解起来可能有些混乱,但是如果串起来看下源码就很容易理解了,重要的是这个封装的方法使用起来非常的方便。

10 回复

是我的就用最新的await/async或者 co 来控制,直接 for 循环就可以非常简单的控制并发,代码量也很少.

@cctv1005s 有相关案例可以分享学习一下没

就拿你上面的那个代码来说把,我要把并发限制在5,我就这么写.

// 封装成一个promise
let loadImgPromise = (url) =>{
 return new Promise((res,rej)=>{
  loadImg(url,(err,result)=>{
   if(err){
    return rej(err);
   }
   return res(result);
  })
 })
}

// 用 co 控制并发
co(function*(){
  // images 是你上述代码中的一个数组
  const results = [];
  for(var i = 0;i < images.length;i+=5){
     const loaders = [];
	 for(var j = i; J < i + 5; j++ ){
	 if(images[j]){
	   loaders.push(loadImgPromise(images[j]));
	  }
	 }
	 const data = yield loaders;
	 results = results.concat(data);
  }
})

@cctv1005s 你的这种实现是这样加载的5,10,15,20,也就是说只有第一组5张全部加载完成才会开始下一组的加载,在第一组5张的加载过程中,请求的并发数为5,4,3,2,1,0,然后开始加载一下组。而我的这种实现是如果第一组的5张中有一张完成加载,马上会开始加载第六张,以此类推,可以保证并发数始终为5。

@yvanwangl

原来如此,一开始没理解题意,我这样实现呢?


function* coLoad (url){
  // 封装成一个promise
  let loadImgPromise = (url) =>{
    return new Promise((res,rej)=>{
      loadImg(url,(err,result)=>{
        if(err){
        return rej(err);
        }
        return res(result);
      })
    });
  }
  return yield loadImgPromise(url);
}

function thread(fn, n) {
  var gens = [];
  while (n--) gens.push(fn);
  return gens;
}

function *parallel(thunks, n){
  var n = Math.min(n || 5, thunks.length);
  var ret = [];
  var index = 0;

  function *next() {
    var i = index++;
    ret[i] = yield thunks[i];
    if (index < thunks.length) yield next;
  }

  yield thread(next, n);

  return ret;
};

 // 用 co 控制并发
 co(function*(){
  // 构建chunks
  const reqs = images.map(coLoad);
  var res = yield parallel(reqs, 5);
  console.log(res);  
 });

@cctv1005s 看得出功力很深厚啊,向你学习

@atian25 解决方案还是很多的。

@yvanwangl 社区嘛,就是大家互相交流学习的地方 ; )

回到顶部