async的parallel具体使用时的一个问题( 已解决)
发布于 8 年前 作者 hfqf 5071 次浏览 来自 问答

untitled1.png

想利用parallel并行执行n的纪录插入数据库后再返回数据给前端,但是不知道哪出问题了。

16 回复

没看到你传给insertContact形参callback的实参是什么…

@stonephp thanks 按你提示解决了,但是如果非要用parallel去使用,可不可以呢,如果可以,该怎么修改,再次感谢

@DevinXian 恩 写的有问题 用async.map就好了

var f = insertContact(item,function(…))

@zhaixg great! 一语点醒梦中人

发现第一个问题: 你的 insertContact 方法很有问题。 insertContact自己有一个参数是 callback.而newContact.save 回调函数参数名字也叫callback。 内层的callback会覆盖外层的callback。 所以说

//第二个参数传不传都可以。因为方法内部没有调用到第二个参数
var f = insertContact(item, '任何值都可以。'); //这个方法是一个异步方法,不会有返回值,所以 f 必定是 undefined

发现第二个问题:

你的for循环内部在执行异步方法。 所以

async.parallel(arrInsertContact, (e, v) => {}); //根据第一个问题返回值可以得知,这里的 arrInsertContact 里面会有值,但是都是undefined。

可以这样写(仅供参考): ps:下次别贴图,帮你回答问题的人都要手动 敲代码,真的好烦。。。

function insertContact (item, callback){
	var newContact = new Contact({
	...
	...
	});
	newContact.save(callback);
}

xxx.post('xxx', (req,  res, next) => {
	var arrInsertContact =[];
	var arrContact = JSON.parse(...);
	// to see : http://caolan.github.io/async/docs.html#eachLimit
	//或者用 async.each. eachLimit 第二个数值参数是并行执行个数。
	async.eachLimit(arrContact, 1, (data, next) => {
		arrInsertContact.push(data);	
		return next();
	},  (err) => {
		if(err){
			//处理错误
		}
		
		//然后就是下面你写的代码了。
		async.parallel(arrInsertContact, (e, v) => {});
	});
});

@KingTree 真心感谢 一步步的分析非常好 再次感谢 以后一定贴代码!

@KingTree 真是有心人,手把手呀…

@hfqf

你这样写代码的方式很不喜欢。我只是在你的代码基础上改改。

建议你用 async的异步流程控制方法。

//to see : http://caolan.github.io/async/docs.html#waterfall
//异步控制瀑布流方式
//在没有ES6 时代,async 模块很重要,因为解决地狱回调 就靠 async~~~~ 建议把 async 文档好好看看。
async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) {
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});

// Or, with named functions:
async.waterfall([
    myFirstFunction,
    mySecondFunction,
    myLastFunction,
], function (err, result) {
    // result now equals 'done'
});
function myFirstFunction(callback) {
    callback(null, 'one', 'two');
}
function mySecondFunction(arg1, arg2, callback) {
    // arg1 now equals 'one' and arg2 now equals 'two'
    callback(null, 'three');
}
function myLastFunction(arg1, callback) {
    // arg1 now equals 'three'
    callback(null, 'done');
}

首先选择合适的控制流方法,你这个是对数组逐一迭代,之后你需要返回结果,自然就用 map 了。你一定说用 parallel , 也可以,你需要先根据你的数组 map 出一个 函数数组,之后还要在全局来记录执行后的结果。费力不讨好

@hfqf 你贴的这段代码,如上面的人所说,有一堆问题啊,一堆问题呀,一堆问题呀…重要的话说3遍,希望你真的明白是什么问题,这个功能我也给你上一段代码把,纯手打: #方案1,使用async模块的mapLimit方法,接受4个参数 mapLimit(datas, parallelLimit, executeForEveryData, finalFunction),

原函数说明是 function (arr, limit, iterator, callback)

  • param1: 就是一个数组啦,
  • param2: 就是一个并发数控制啦
  • param3: 是一个中间函数,数组每一个元素都会被这个函数处理, function( itemInDatas, callback ){ //your code here }
  • param4: 就是所有中间函数执行完毕后,最后要执行的函数啦

··· var async = require(‘async’);

var saveMethod = (toSave, cb)=>{ //you save logic code here cb(null); };

function someRouter(res){ async.mapLimit([{name: ‘data1’}, {name: ‘data2’}], 1000, (item, callback)=> { saveMethod(item, (err)=> { if (err)callback(err); else callback(null); }) }, (err, results)=> { // your code here like: if(err) return res.send(‘xxxx’); else return res.send(‘yyyy’); }); }

···

#方案2,基于最原始的事件对象,了解这个更有助于你接近nodejs的核心本质 ··· function saveManager(){} require(‘util’).inherits(yourSaver,require(‘events’).EventEmitter); var saveMethod = (toSave)=>{ //you save logic code here, newContact是你自己的那个存储对象 newContact.save((err)=>{ if(err)saveManager.emit(‘error’,err); else saveManager.emit(‘one_saved’); }); };

function someRouter(res){ var dataArr = [{name: ‘data1’}, {name: ‘data2’}]; var dataCount = dataArr.length; var savedCount = 0;

dataArr.forEach(val=> saveMethod(val));//并行执行saveMethod,保存数据

saveManager.on('one_saved',function(){
	savedCount++;
	if(dataCount==savedCount){//所有数据插入完毕
		res.send('插入完毕');
	}
});
saveManager.on('error',function(err){
	res.send(err);
});

} ···

#方案 3 promizse 保裝,然后Promise.all 并行保存 , 方案 4 基于co和thunkify,用yield去并行,我就不上代码了,楼主你要总结你的代码错误之处啊,并且明白为什么这样写不对

@fantasticsoul 非常感谢 你的各种方案,这是我后来改的写法: router.post(’/contact’,function (req, res, next) { let arrContact = JSON.parse(req.body.contact);

async.map(arrContact,function (item,callback) {

    let newContact = new  Contact({
        carcode:item.carcode,
        name:item.name,
        tel:item.tel,
        cartype:item.cartype,
        owner:item.owner
    });

    newContact.save(function (err,docs) {
        if(err){
            callback(err,1)
        }else{
            callback(null,docs);
        }

    });

},(e,v)=>{
    "use strict";
    if(e){
        console.log(e);
        return res.send(global.retFormate(0,e,'存入数据失败'));
    }
    else {
        console.log(v);
        return res.send(global.retFormate(1,'存入数据成功','存入数据成功'));
    }
})

}),

@hfqf 是的就是这样,map 和 mapLimit其实差不多,只是说mapLimit可以控制并发数,通常来说如果你的数组长度不超过1000,map是ok的,超过1000的话可能会保错 :maximum call stack size exceeded,这个就要看你实际业务逻辑,保错的数组会不会大于1000了

@fantasticsoul 恩 感谢 这个到时通过客户端代码把数组数量控制下

对事件EventEmiter 搞的还是不到位啊,明显是callback猛蹬了

async.parallel(
	[
	  function(err,cb){
			newConcat.save(function(err_save,cb_save){
				if (err_save) cb_save(err_save);
				cb(err,1)
			});
	  },
	  function(err,cb){
	  		newConcat.save(function(err_save,cb_save){
	  			if (err_save) cb_save(err_save);
				cb(err,2)
			});
	  },
	],
	function(err,result){
			console.log(result);
	}
);

写正确的情况是 print: [1,2]

备注: 回调函数 cb = function(err,result){…} 回调函数 cbsave = newConcat 参数传进来的回调函数

回到顶部