这个问题困扰我很久了,一致没有想明白。 这个表有100万的document,我需要遍历他,每次拿出来10条文档,对其进行1个比较耗时的处理,会利用callback返回,进行下10条文档的处理。 直接使用如下代码,把所有100万数据都放入内存,然后利用async处理,我知道比较蠢,测试了一下,内存猛涨
Blockquote MyModel.find({}, function (err, docs) { dosomething(docs,callback)}); Blockquote
这里请教一下大侠们,可否指点一下方法,使得可以遍历完成100万数据的处理,并且不会引起内存骤增,谢谢了!
加个定时器试试,个人理解,高手勿喷
定时器是无法搞定的!
100W应该有索引吧例如_id,那每次读10条,callback自己再继续下个10条?
callback 会导致堆栈溢出 有个类似的问题:http://stackoverflow.com/questions/10546193/how-to-return-large-amount-of-rows-from-mongodb-using-node-js-http-server
答案4不明白如何实现的
如果每次读取10条,需要等待这10条处理完毕callback返回后,再去拿下10条去处理。这么考虑是为了不让处理数据的服务负载太大。
如果每次拿10条记录,不等待处理服务处理完毕,就又去领10条记录处理,会让处理数据的服务压力太大!
因此,想采取单线一次只处理10条记录,处理完毕再处理下10条,直至所有100万记录处理完毕。
是不是mongodb输出数据时,是不等待的,无法组塞住?
I guess it means
var MongoClient = require('mongodb').MongoClient;
var url='mongodb://localhost/sample';
MongoClient.connect(url, function(err, db) {
if(err) { return console.log("connection failed: %s",err); }
console.log('connected to mongo, ready for duty work');
var collection=db.collection('files');
//setInterval(function(){ console.log('memroy usage : %j',process.memoryUsage());},10000);
/** fetch all records **/
var stream=collection.find().stream(),cache=[];
stream.on('data',function(item){
cache.push(item);
if(cache.length==10){
/** signal mongo to pause reading **/
stream.pause();
process.nextTick(function(){
doLotsWork(cache,function(){
cache=[];
/** signal mongo to continue, fetch next record **/
stream.resume();
});
});
}
});
stream.on('end',function(){ console.log('query ended'); });
stream.on('close',function(){ console.log('query closed'); });
});
function doLotsWork(records,callback){
//.....do lots of work
//.....
//all done, ready to deal with next 10 records
process.nextTick(function(){
callback();
});
}
tested with 967615 records, memory usage: 18M --> 110M --> 25M
一次性取100万个太大了,那能不能一次取10个呢?使用分页
function cursor(start,limit){
//1.游标初始化
var start;
if(!start){
start = start || 0;
}
//2.分页初始化
var limit = limit || 10;
//3.分页查询
Model.skip(start).limit(limit).exec(function(err,docs){
//遍历处理
if(!docs || docs.length == 0){
//处理完成
}
for(var i = docs.length - 1; ~i; i--,){
//处理完的判断
if(){
//迭代
cursor(start + docs.length,limit);
}
}
});
}
刚才的没有做过测试,我那自己的项目写了一个完整案例,你看一下对你有没有帮助
//这是我封装后的抽象Model
var Dao = require('./Dao');
//这是我根据抽象的Model查找到实体的Model
var professionDao = Dao('profession');
/**
* 游标函数
* @param _start 游标的起始位置
* @param _limit 游标的分页数量
* @param _callback 游标执行函数
*/
function cursor(_start,_limit,_callback){
//初始化数据定义
var start,limit,flag,len;
//初始化起始位置
start = !_start || _start < 0 ? 0 : _start;
//初始化分页数量
limit = !_limit || _limit < 1 ? 1 : _limit;
//使用Model执行分页查询
professionDao.find().skip(start).limit(limit).exec(function(err,docs){
//缓存长度
len = docs.length;
//如果没有查询到,证明已经查询完毕
if(len === 0){
console.log('遍历结束');
}
//初始化循环结束标记
flag = 0;
//遍历
docs.forEach(function(doc){
//如果有执行函数就执行
if(_callback && toString.call(_callback) === '[object Function]'){
_callback(doc);
}
//如果循环到末尾,则迭代
if(len == ++flag){
cursor(start + docs.length,limit);
}
});
});
}
//执行
cursor(0,10,function(doc){
console.log(doc._id);
});
当然这个是没有经过优化的,如果要优化就要解决变量顺序、默认值等问题,尽量让api简介好用,希望对你有帮助!
@shiedman 跪拜了!
skip方式对大数据有问题,不能使用。这个之前遇到过,后来我们采取的是数字索引排序的方式,index大于起始值,取出来100个,把100个中最大的index记录下来下次查询使用。
这样可以保证拿出的数据是少量的,而且减少了skip引起的速度慢的问题。
如果使用skip方式,在shard环境下,如果跨shard获取数据的话,会引起负载爆升!
相当清晰,非常感谢! 如果要使用在大数据下,不能取过大的数据,否则会引起跨shard扫描的问题。
一般单机环境小数据都是非常棒的方案!
感谢大神!
关注这个问题。 MARK一个
学习了
使用Query.prototype.cursor()
- 流方式
Thing.
find({ name: /^hello/ }).
cursor().
on('data', function(doc) { console.log(doc); }).
on('end', function() { console.log('Done!'); });
- 迭代器方式
co(function*() {
const cursor = Thing.find({ name: /^hello/ }).cursor();
for (let doc = yield cursor.next(); doc != null; doc = yield cursor.next()) {
console.log(doc);
}
});
阔以参考这篇博客:如何高效地遍历 MongoDB 超大集合?
function findAllMembersCursor() {
return Member.find().cursor();
}
async function test() {
const membersCursor = await findAllMembersCursor();
let N = 0;
await membersCursor.eachAsync(member => {
N++;
console.log(`name of the ${N}th member: ${member.name}`);
});
console.log(`loop all ${N} members success`);
}
test();