新人求问:es7 async 如何做并发限制,比如限制为并发5?
发布于 8 年前 作者 mind029 6843 次浏览 来自 问答

刚学nodejs不久,使用es7的async 做了一个采集爬虫 ,想要限制并发,不知道该如何处理。 希望大家给个思路。 比如可以自定义限制并发数,设置一个并发值。下面是代码:最后面for循环 每次都只执行一个,执行完才走下一个。

require('traceur');
var request = require('superagent-charset');
var cheerio = require('cheerio');
var uid = require('uid');
var fs = require('fs');

/**
 * 网络请求
 * [@param](/user/param) url
 * [@param](/user/param) charset
 * [@returns](/user/returns) {Promise}
 */
var doRquest = function(url,charset){
    return new Promise(function (resolve, reject){
        if(charset==1){
            request.get(url).end(function(err,res) {
                if (err) reject(err);
                resolve(res);
            });
        }else {
            request.get(url).charset(charset).end(function(err,res) {
                if (err) reject(err);
                resolve(res);
            });
        }
    });
}

var writeFile = function(buf,newName){
    return new Promise(function (resolve, reject){
        fs.writeFile(newName,buf, function (err) {
            if (err) reject(err);
            console.log(newName+"下载成功");
            resolve(true);
        });
    });
}


/**
 * 下载图片
 */
var downPic = async function(path,newName){
    console.log(path);
    try{
        var bufRes = await doRquest(path,1);
        let buf = bufRes.body;
       /* fs.writeFile(newName,buf, function (err) {
            if (err) reject(err);
            console.log(newName+"下载成功");
            //resolve(true);
        });*/
        let downRs = await writeFile(buf,newName);
        if(!downRs){
            console.log("保存出错了");
        }
    }catch(e) {
        console.log(path+'错误了');
    }
    return 1;
}


/**
 * 采集图片
 */
var getPic = async function(picUrl){
    console.log('采集'+picUrl);
    let linkArr = [];
    try{
        var picRes = await doRquest(picUrl,'gbk');
        var $2 = cheerio.load(picRes.text);
        $2('.puzibody img').each(function (idx, element) {
            let $element = $2(element);
            let src = $element.attr('src');
            src = "http://www.xiayiqu.com"+src;
            let newName = "file/"+uid(10)+src.substr(-4,4);
            linkArr.push({url:src,name:newName});
        });

        //采集图片
        for (let link of linkArr) {
            await downPic(link.url,link.name);
        }
    }catch(e) {
        console.log(picUrl+'错误了');
    }
}


/**
 * 采集网址
 * [@returns](/user/returns) {Array}
 */
var getUrl = async function (){
    let linkArr = [];
    for(let i=1;i<=5;i++){
        let url = 'http://www.xiayiqu.com/pu/list_'+i+'.html';
        console.log("采集"+url);
        try {
            var res = await doRquest(url,'gbk');
            var $ = cheerio.load(res.text);
            $('.zuo li a').each(function (idx, element) {
                var $element = $(element);
                var link = $element.attr('href');
                link = "http://www.xiayiqu.com"+link;
                linkArr.push(link);
            });
        } catch(e) {
            console.log(url+'错误了');
        }
    }
    return linkArr;
};

//采集全部
var getAll = async function() {
    let urls = await getUrl();
    console.log("总页面"+urls.length);
    for (let url of urls) {
        await getPic(url);
    }
    return true;
}
//开始采集
getAll();  \n```
3 回复

现在并发是1 …

e.g 并发5

var getAll = async function() {
    let urls = await getUrl();
    console.log("总页面"+urls.length);
	await Promise.map(urls, function(url) {
	   return getPic(url); 
	}, { concurrency: 5 });
    return true;
}

Promise.map 由 bluebird 提供 note https://github.com/magicdawn/magicdawn/issues/18#issuecomment-70646869

@magicdawn 非常感谢您这个办法,下载bluebird 引进后,终于能耐控制并发了。

@mind029

caolan/async , async.js 有很多可以参考 e.g 参考 async.parallelLimit 写的Promise.map

module.exports = Promise.map = map;

function map(arr, fn, concurrency) {

  concurrency = concurrency || 1;

  return new Promise(function(resolve, reject) {

    var completed = 0;
    var started = 0;
    var running = 0;
    var results = new Array(arr.length);

    (function replenish() {
      if (completed >= arr.length) {
        return resolve(results);
      };

      while (running < concurrency && started < arr.length) {
        running++;
        started++;

        var index = started - 1;
        fn.call(arr[index], arr[index], index) // item,index
          .then(function(result) {
            // console.log('done');
            running--;
            completed++;
            results[index] = result;

            replenish();
          })
          .catch(reject);
      }
    })();
  });
}
回到顶部