如何使用mongoose对一个100万+的mongodb的表进行遍历操作
发布于 12 年前 作者 askie 41642 次浏览 最后一次编辑是 8 年前

这个问题困扰我很久了,一致没有想明白。 这个表有100万的document,我需要遍历他,每次拿出来10条文档,对其进行1个比较耗时的处理,会利用callback返回,进行下10条文档的处理。 直接使用如下代码,把所有100万数据都放入内存,然后利用async处理,我知道比较蠢,测试了一下,内存猛涨

Blockquote MyModel.find({}, function (err, docs) { dosomething(docs,callback)}); Blockquote

这里请教一下大侠们,可否指点一下方法,使得可以遍历完成100万数据的处理,并且不会引起内存骤增,谢谢了!

16 回复

加个定时器试试,个人理解,高手勿喷

定时器是无法搞定的!

100W应该有索引吧例如_id,那每次读10条,callback自己再继续下个10条?

callback 会导致堆栈溢出 有个类似的问题:http://stackoverflow.com/questions/10546193/how-to-return-large-amount-of-rows-from-mongodb-using-node-js-http-server

答案4不明白如何实现的

如果每次读取10条,需要等待这10条处理完毕callback返回后,再去拿下10条去处理。这么考虑是为了不让处理数据的服务负载太大。

如果每次拿10条记录,不等待处理服务处理完毕,就又去领10条记录处理,会让处理数据的服务压力太大!

因此,想采取单线一次只处理10条记录,处理完毕再处理下10条,直至所有100万记录处理完毕。

是不是mongodb输出数据时,是不等待的,无法组塞住?

I guess it means

var MongoClient = require('mongodb').MongoClient;
var url='mongodb://localhost/sample';
MongoClient.connect(url, function(err, db) {
    if(err) { return console.log("connection failed: %s",err); }
    console.log('connected to mongo, ready for duty work');
    var collection=db.collection('files');
    //setInterval(function(){ console.log('memroy usage : %j',process.memoryUsage());},10000);
    /** fetch all records **/
    var stream=collection.find().stream(),cache=[];
    stream.on('data',function(item){
        cache.push(item);
        if(cache.length==10){
            /** signal mongo to pause reading **/
            stream.pause();
            process.nextTick(function(){
                doLotsWork(cache,function(){
                    cache=[];
                    /** signal mongo to continue, fetch next record **/
                    stream.resume();
                });
            });
        }
    });
    stream.on('end',function(){ console.log('query ended'); });
    stream.on('close',function(){ console.log('query closed'); });
});

function doLotsWork(records,callback){
    //.....do lots of work
    //.....
    //all done, ready to deal with next 10 records
    process.nextTick(function(){
        callback();
    });
}

tested with 967615 records, memory usage: 18M --> 110M --> 25M

一次性取100万个太大了,那能不能一次取10个呢?使用分页

function cursor(start,limit){
    //1.游标初始化
    var start;
    if(!start){
        start = start || 0;
    }
    //2.分页初始化
    var limit = limit || 10;
    //3.分页查询
    Model.skip(start).limit(limit).exec(function(err,docs){
        //遍历处理
        if(!docs || docs.length == 0){
            //处理完成
        }
        for(var i = docs.length - 1; ~i; i--,){
            //处理完的判断
            if(){
                //迭代
                cursor(start + docs.length,limit);
            }
        }
    });
}

刚才的没有做过测试,我那自己的项目写了一个完整案例,你看一下对你有没有帮助

//这是我封装后的抽象Model
var Dao = require('./Dao');
//这是我根据抽象的Model查找到实体的Model
var professionDao = Dao('profession');
/**
 * 游标函数
 * @param _start 游标的起始位置
 * @param _limit 游标的分页数量
 * @param _callback 游标执行函数
 */
function cursor(_start,_limit,_callback){
  //初始化数据定义
  var start,limit,flag,len;
  //初始化起始位置
  start = !_start || _start < 0 ? 0 : _start;
  //初始化分页数量
  limit = !_limit || _limit < 1 ? 1 : _limit;
  //使用Model执行分页查询
  professionDao.find().skip(start).limit(limit).exec(function(err,docs){
    //缓存长度
    len = docs.length;
    //如果没有查询到,证明已经查询完毕
    if(len === 0){
      console.log('遍历结束');
    }
    //初始化循环结束标记
    flag = 0;
    //遍历
    docs.forEach(function(doc){
      //如果有执行函数就执行
      if(_callback && toString.call(_callback) === '[object Function]'){
        _callback(doc);
      }
      //如果循环到末尾,则迭代
      if(len == ++flag){
        cursor(start + docs.length,limit);
      }
    });
  });
}
//执行
cursor(0,10,function(doc){
  console.log(doc._id);
});

当然这个是没有经过优化的,如果要优化就要解决变量顺序、默认值等问题,尽量让api简介好用,希望对你有帮助!

@shiedman 跪拜了!

skip方式对大数据有问题,不能使用。这个之前遇到过,后来我们采取的是数字索引排序的方式,index大于起始值,取出来100个,把100个中最大的index记录下来下次查询使用。

这样可以保证拿出的数据是少量的,而且减少了skip引起的速度慢的问题。

如果使用skip方式,在shard环境下,如果跨shard获取数据的话,会引起负载爆升!

相当清晰,非常感谢! 如果要使用在大数据下,不能取过大的数据,否则会引起跨shard扫描的问题。

一般单机环境小数据都是非常棒的方案!

感谢大神!

关注这个问题。 MARK一个

使用Query.prototype.cursor()

  • 流方式
Thing.
  find({ name: /^hello/ }).
  cursor().
  on('data', function(doc) { console.log(doc); }).
  on('end', function() { console.log('Done!'); });
  • 迭代器方式
co(function*() {
  const cursor = Thing.find({ name: /^hello/ }).cursor();
  for (let doc = yield cursor.next(); doc != null; doc = yield cursor.next()) {
    console.log(doc);
  }
});

参考:query_Query-cursor

阔以参考这篇博客:如何高效地遍历 MongoDB 超大集合?

function findAllMembersCursor() {
    return Member.find().cursor();
}

async function test() {
    const membersCursor = await findAllMembersCursor();
    let N = 0;
    await membersCursor.eachAsync(member => {
        N++;
        console.log(`name of the ${N}th member: ${member.name}`);
    });
    console.log(`loop all ${N} members success`);
}

test();
回到顶部