如何使用Node.js在MongoDB中使用cursor.forEach()?

我的数据库中有大量的文档,我想知道如何浏览所有文档并更新它们,每个文档具有不同的值。

user3502786 asked 2020-02-17T22:49:16Z
7个解决方案
111 votes

答案取决于您使用的驱动程序。 我知道的所有MongoDB驱动程序都有以某种方式实现的queue

这里有些例子:

node-mongodb-native

collection.find(query).forEach(function(doc) {
  // handle
}, function(err) {
  // done or error
});

mongojs

db.collection.find(query).forEach(function(err, doc) {
  // handle
});

collection.find(query, { stream: true })
  .each(function(doc){
    // handle doc
  })
  .error(function(err){
    // handle error
  })
  .success(function(){
    // final callback
  });

猫鼬

collection.find(query).stream()
  .on('data', function(doc){
    // handle doc
  })
  .on('error', function(err){
    // handle error
  })
  .on('end', function(){
    // final callback
  });

queue回调中更新文档

queue回调内部更新文档的唯一问题是,您不知道何时更新所有文档。

要解决此问题,您应该使用一些异步控制流解决方案。 以下是一些选项:

  • 异步的
  • 承诺(when.js,bluebird)

这是使用queuequeue功能的示例:

var q = async.queue(function (doc, callback) {
  // code for your update
  collection.update({
    _id: doc._id
  }, {
    $set: {hi: 'there'}
  }, {
    w: 1
  }, callback);
}, Infinity);

var cursor = collection.find(query);
cursor.each(function(err, doc) {
  if (err) throw err;
  if (doc) q.push(doc); // dispatching doc to async.queue
});

q.drain = function() {
  if (cursor.isClosed()) {
    console.log('all items have been processed');
    db.close();
  }
}
Leonid Beschastny answered 2020-02-17T22:50:13Z
13 votes

使用next()驱动程序和具有异步/等待功能的现代NodeJS,一个好的解决方案是使用next()

const collection = db.collection('things')
const cursor = collection.find({
  bla: 42 // find all things where bla is 42
});
let document;
while ((document = await cursor.next())) {
  await collection.findOneAndUpdate({
    _id: document._id
  }, {
    $set: {
      blu: 43
    }
  });
}

这导致一次只需要一个文档在存储器中,而不是例如。 开始处理文件之前接受的答案,即许多文档被吸入内存。 在“大量收藏”的情况下(根据问题),这可能很重要。

如果文档很大,则可以通过使用投影来进一步改善,以便仅从数据库中获取所需文档的那些字段。

chris6953 answered 2020-02-17T22:50:43Z
6 votes

请注意,调用find()使应用程序获取整个数据集。


请注意,将find()返回的游标分配给了forEach。使用这种方法,而不是一次获取所有数据并立即使用数据,我们将数据流式传输到我们的应用程序。 toArray可以立即创建游标,因为在我们尝试使用它将提供的一些文档之前,它实际上并未向数据库发出请求。 toArray的意思是描述我们的查询。 cursor.forEach的第二个参数显示了驾驶员精疲力尽或发生错误时的处理方法。

在上述代码的初始版本中,是forEach强制了数据库调用。 这意味着我们需要所有文档,并希望它们位于toArray中。

此外,forEach还以批处理格式返回数据。 下图显示了游标(来自应用程序)对toArray的请求

MongoDB cursor requests

forEachtoArray更好,因为我们可以处理文档,直到它们到达末端为止。 将其与toArray进行对比-在这里我们等待所有文档被检索并构建整个数组。 这意味着驱动程序和数据库系统正在一起将结果批处理到您的应用程序中,因此我们没有任何优势。 批处理旨在提供内存开销和执行时间方面的效率。 如果可以,请充分利用它。

student answered 2020-02-17T22:51:23Z
4 votes

Leonid的答案很好,但是我想强调使用异步/承诺的重要性,并给出一个带有Promise示例的解决方案。

解决此问题的最简单方法是循环forEach文档并调用更新。 通常,您不需要在每次请求后关闭db连接,但是如果需要关闭连接,请小心。 如果您确定所有更新都已完成执行,则必须将其关闭。

常见的错误是在调度所有更新后不知道它们是否已完成而调用db.close()。 如果这样做,将会出现错误。

实施错误:

collection.find(query).each(function(err, doc) {
  if (err) throw err;

  if (doc) {
    collection.update(query, update, function(err, updated) {
      // handle
    });
  } 
  else {
    db.close(); // if there is any pending update, it will throw an error there
  }
});

但是,由于db.close()也是异步操作(其签名具有回调选项),因此您可能很幸运,此代码可以正确完成。 仅当您只需要更新一个小集合中的几个文档时,它才可能起作用(因此,请不要尝试)。


正确的解决方案:

由于Leonid已经提出了使用异步的解决方案,因此下面介绍了使用Q承诺的解决方案。

var Q = require('q');
var client = require('mongodb').MongoClient;

var url = 'mongodb://localhost:27017/test';

client.connect(url, function(err, db) {
  if (err) throw err;

  var promises = [];
  var query = {}; // select all docs
  var collection = db.collection('demo');
  var cursor = collection.find(query);

  // read all docs
  cursor.each(function(err, doc) {
    if (err) throw err;

    if (doc) {

      // create a promise to update the doc
      var query = doc;
      var update = { $set: {hi: 'there'} };

      var promise = 
        Q.npost(collection, 'update', [query, update])
        .then(function(updated){ 
          console.log('Updated: ' + updated); 
        });

      promises.push(promise);
    } else {

      // close the connection after executing all promises
      Q.all(promises)
      .then(function() {
        if (cursor.isClosed()) {
          console.log('all items have been processed');
          db.close();
        }
      })
      .fail(console.error);
    }
  });
});
Zanon answered 2020-02-17T22:52:11Z
4 votes

以下是将Mongoose游标与promise异步使用的示例:

new Promise(function (resolve, reject) {
  collection.find(query).cursor()
    .on('data', function(doc) {
      // ...
    })
    .on('error', reject)
    .on('end', resolve);
})
.then(function () {
  // ...
});

参考:

  • 猫鼬光标
  • 流和承诺
Wtower answered 2020-02-17T22:52:44Z
4 votes

node-mongodb-native现在支持cursor.forEachendCallback参数,以便在整个迭代后处理事件,有关详细信息,请参考官方文档[http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor .html#forEach。]

另请注意,现在在nodejs本机驱动程序中已弃用.each。

Yongfeng Lu answered 2020-02-17T22:53:09Z
0 votes

先前的答案均未提及批处理更新。 这使它们非常慢-比使用bulkWrite的解决方案慢几十倍或几百倍。

假设您想将每个文档中字段的值加倍。 这是在固定内存消耗下快速执行💨的方法:

// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100;  // how many documents to write at once
let i = 0;
db.collection.find({ ... }).forEach(doc => {
  i++;

  // Update the document...
  doc.foo = doc.foo * 2;

  // Add the update to an array of bulk operations to execute later
  bulkWrites.push({
    replaceOne: {
      filter: { _id: doc._id },
      replacement: doc,
    },
  });

  // Update the documents and log progress every `bulkDocumentsSize` documents
  if (i % bulkDocumentsSize === 0) {
    db.collection.bulkWrite(bulkWrites);
    bulkWrites = [];
    print(`Updated ${i} documents`);
  }
});
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);
Dan Dascalescu answered 2020-02-17T22:53:33Z
translate from https://stackoverflow.com:/questions/25507866/how-can-i-use-a-cursor-foreach-in-mongodb-using-node-js