From 941c93c010d0b6f289db0fffb86434829e0997d8 Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 14 Apr 2020 17:10:49 -0400 Subject: [PATCH 01/20] test: add new tests demonstrating failure --- test/functional/change_stream.test.js | 187 ++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 37b3b6266c6..45177a24678 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -13,6 +13,96 @@ const sinon = require('sinon'); chai.use(require('chai-subset')); +/** + * Triggers a fake resumable error on a change stream + * + * @param {ChangeStream} changeStream + * @param {Function} onCursorClosed callback when cursor closed due this error + */ +function triggerResumableError(changeStream, onCursorClosed) { + const closeCursor = changeStream.cursor.close; + changeStream.cursor.close = callback => { + changeStream.cursor.close = closeCursor; + changeStream.cursor.close(callback); + onCursorClosed(); + }; + const fakeResumableError = new MongoNetworkError('fake error'); + fakeResumableError[mongoErrorContextSymbol] = { isGetMore: true }; + changeStream.cursor.emit('error', fakeResumableError); +} + +/** + * Waits for a change stream to start + * + * @param {ChangeStream} changeStream + * @param {Function} callback + */ +function waitForStarted(changeStream, callback) { + changeStream.cursor.once('init', () => { + callback(); + }); +} + +/** + * Iterates the next discrete batch of a change stream non-eagerly. This + * will return `null` if the next bach is empty, rather than waiting forever + * for a non-empty batch. + * + * @param {ChangeStream} changeStream + * @param {Function} callback + */ +function tryNext(changeStream, callback) { + let complete = false; + function done(err, result) { + if (complete) return; + + // if the arity is 1 then this a callback for `more` + if (arguments.length === 1) { + result = err; + const batch = result.cursor.firstBatch || result.cursor.nextBatch; + if (batch.length === 0) { + complete = true; + callback(null, null); + } + + return; + } + + // otherwise, this a normal response to `next` + complete = true; + changeStream.removeListener('more', done); + if (err) return callback(err); + callback(err, result); + } + + // race the two requests + changeStream.next(done); + changeStream.cursor.once('more', done); +} + +/** + * Exhausts a change stream aggregating all responses until the first + * empty batch into a returned array of events. + * + * @param {ChangeStream} changeStream + * @param {Function|Array} bag + * @param {Function} [callback] + */ +function exhaust(changeStream, bag, callback) { + if (typeof bag === 'function') { + callback = bag; + bag = []; + } + + tryNext(changeStream, (err, doc) => { + if (err) return callback(err); + if (doc === null) return callback(undefined, bag); + + bag.push(doc); + exhaust(changeStream, bag, callback); + }); +} + // Define the pipeline processing changes var pipeline = [ { $addFields: { addedField: 'This is a field added using $addFields' } }, @@ -2827,3 +2917,100 @@ describe('Change Streams', function() { }); }); }); + +describe('Change Stream Resume Error Tests', function() { + it('(events) should continue iterating after a resumable error', function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + changeStream.close(() => client.close(done)); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); + }); + }); + }); + }); + + it('(promises) hasNext should work after a resumable error', function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.close(() => client.close(done)); + }); + }); + }); + }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.close(() => client.close(done)); + }); + }); + }); + + it('(promises) should continue iterating after a resumable error', async function() { + const configuration = this.configuration; + const client = configuration.newClient(); + await client.connect(); + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); + }); + }); + const docs = []; + while (await changeStream.hasNext()) { + const change = await changeStream.next(); + docs.push(change.fullDocument); + if (change.fullDocument.b === 24) { + break; + } + } + expect(docs) + .to.be.an('array') + .with.lengthOf(2); + + await changeStream.close(); + await client.close(); + }); +}); From 639b4220a3a3573e747c27e8a0f1498ec4caa74c Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 14 Apr 2020 17:28:41 -0400 Subject: [PATCH 02/20] test: only run on servers that support change streams --- test/functional/change_stream.test.js | 153 ++++++++++++++------------ 1 file changed, 81 insertions(+), 72 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 45177a24678..b72ef0a4f27 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2919,98 +2919,107 @@ describe('Change Streams', function() { }); describe('Change Stream Resume Error Tests', function() { - it('(events) should continue iterating after a resumable error', function(done) { - const configuration = this.configuration; - const client = configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - const docs = []; - changeStream.on('change', change => { - expect(change).to.exist; - docs.push(change); - if (docs.length === 2) { - expect(docs[0]).to.containSubset({ - operationType: 'insert', - fullDocument: { a: 42 } - }); - expect(docs[1]).to.containSubset({ - operationType: 'insert', - fullDocument: { b: 24 } + it('(events) should continue iterating after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + changeStream.close(() => client.close(done)); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); }); - changeStream.close(() => client.close(done)); - } + }); }); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; + } + }); + + it('(promises) hasNext should work after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.close(() => client.close(done)); + }); }); }); }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.close(() => client.close(done)); + }); }); - }); + } }); - it('(promises) hasNext should work after a resumable error', function(done) { - const configuration = this.configuration; - const client = configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; + it('(promises) should continue iterating after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function() { + const configuration = this.configuration; + const client = configuration.newClient(); + await client.connect(); const collection = client.db().collection('test'); const changeStream = collection.watch(); waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; triggerResumableError(changeStream, () => { - changeStream.hasNext((err1, hasNext) => { + collection.insertOne({ b: 24 }, err => { expect(err).to.not.exist; - expect(hasNext).to.be.true; - changeStream.close(() => client.close(done)); }); }); }); }); - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; - changeStream.close(() => client.close(done)); - }); - }); - }); - - it('(promises) should continue iterating after a resumable error', async function() { - const configuration = this.configuration; - const client = configuration.newClient(); - await client.connect(); - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; - }); - }); - }); - }); - const docs = []; - while (await changeStream.hasNext()) { - const change = await changeStream.next(); - docs.push(change.fullDocument); - if (change.fullDocument.b === 24) { - break; + const docs = []; + while (await changeStream.hasNext()) { + const change = await changeStream.next(); + docs.push(change.fullDocument); + if (change.fullDocument.b === 24) { + break; + } } - } - expect(docs) - .to.be.an('array') - .with.lengthOf(2); + expect(docs) + .to.be.an('array') + .with.lengthOf(2); - await changeStream.close(); - await client.close(); + await changeStream.close(); + await client.close(); + } }); }); From 9cb32c6ead2d50e58dfd075a90b212d0379955ff Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 25 Apr 2020 20:36:55 -0400 Subject: [PATCH 03/20] test: fix tests --- test/functional/change_stream.test.js | 71 ++++++++++++++------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index b72ef0a4f27..6c9612adf48 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -3,6 +3,7 @@ var assert = require('assert'); var Transform = require('stream').Transform; const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; +var mongoErrorContextSymbol = require('../../lib/core').mongoErrorContextSymbol; var setupDatabase = require('./shared').setupDatabase; var delay = require('./shared').delay; var co = require('co'); @@ -20,15 +21,17 @@ chai.use(require('chai-subset')); * @param {Function} onCursorClosed callback when cursor closed due this error */ function triggerResumableError(changeStream, onCursorClosed) { - const closeCursor = changeStream.cursor.close; + const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { - changeStream.cursor.close = closeCursor; - changeStream.cursor.close(callback); - onCursorClosed(); + closeCursor(err => { + callback && callback(err); + onCursorClosed(); + }); }; const fakeResumableError = new MongoNetworkError('fake error'); fakeResumableError[mongoErrorContextSymbol] = { isGetMore: true }; - changeStream.cursor.emit('error', fakeResumableError); + // delay error slightly to better simulate real conditions + setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), 250); } /** @@ -38,7 +41,7 @@ function triggerResumableError(changeStream, onCursorClosed) { * @param {Function} callback */ function waitForStarted(changeStream, callback) { - changeStream.cursor.once('init', () => { + changeStream.cursor.once('response', () => { callback(); }); } @@ -2958,7 +2961,7 @@ describe('Change Stream Resume Error Tests', function() { } }); - it('(promises) hasNext should work after a resumable error', { + it('(callback) hasNext should work after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: function(done) { const configuration = this.configuration; @@ -2972,7 +2975,7 @@ describe('Change Stream Resume Error Tests', function() { expect(err).to.not.exist; triggerResumableError(changeStream, () => { changeStream.hasNext((err1, hasNext) => { - expect(err).to.not.exist; + expect(err1).to.not.exist; expect(hasNext).to.be.true; changeStream.close(() => client.close(done)); }); @@ -2982,44 +2985,44 @@ describe('Change Stream Resume Error Tests', function() { changeStream.hasNext((err, hasNext) => { expect(err).to.not.exist; expect(hasNext).to.be.true; - changeStream.close(() => client.close(done)); }); }); } }); - it('(promises) should continue iterating after a resumable error', { + it('(callback) should continue iterating after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: async function() { + test: function(done) { const configuration = this.configuration; const client = configuration.newClient(); - await client.connect(); - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; + client.connect(err => { + expect(err).to.not.exist; + const collection = client.db().collection('test'); + const changeStream = collection.watch(); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + changeStream.close(() => client.close(done)); + }); + }); }); }); }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + }); }); - const docs = []; - while (await changeStream.hasNext()) { - const change = await changeStream.next(); - docs.push(change.fullDocument); - if (change.fullDocument.b === 24) { - break; - } - } - expect(docs) - .to.be.an('array') - .with.lengthOf(2); - - await changeStream.close(); - await client.close(); } }); }); From 1713f57a25931a24941da46aa951cb23481b9f6f Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 25 Apr 2020 20:38:24 -0400 Subject: [PATCH 04/20] refactor: add getCursor to prevent iterating on old cursor during resume attempt --- lib/change_stream.js | 117 ++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 64 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 6e15022282d..59e1af0736f 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -128,7 +128,12 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ hasNext(callback) { - return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb)); + return maybePromise(this.parent, callback, cb => + getCursor(this, (err, cursor) => { + if (err) return cb(err); + cursor.hasNext(cb); + }) + ); } /** @@ -139,14 +144,17 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ next(callback) { - return maybePromise(this.parent, callback, cb => { - if (this.isClosed()) { - return cb(new Error('Change Stream is not open.')); - } - this.cursor.next((error, change) => { - processNewChange({ changeStream: this, error, change, callback: cb }); - }); - }); + return maybePromise(this.parent, callback, cb => + getCursor(this, (err, cursor) => { + if (err) return cb(err); + if (this.isClosed()) { + return cb(new Error('Change Stream is not open.')); + } + cursor.next((error, change) => { + processNewChange({ changeStream: this, error, change, callback: cb }); + }); + }) + ); } /** @@ -468,84 +476,56 @@ function processNewChange(args) { // If the changeStream is closed, then it should not process a change. if (changeStream.isClosed()) { // We do not error in the eventEmitter case. - if (eventEmitter) { - return; - } - - const error = new MongoError('ChangeStream is closed'); - return typeof callback === 'function' - ? callback(error, null) - : changeStream.promiseLibrary.reject(error); + if (eventEmitter) return; + return callback(new MongoError('ChangeStream is closed')); } const cursor = changeStream.cursor; - const topology = changeStream.topology; - const options = changeStream.cursor.options; if (error) { - if (isResumableError(error) && !changeStream.attemptingResume) { - changeStream.attemptingResume = true; + if (isResumableError(error) && changeStream.cursor) { + changeStream.cursor = null; + const onResume = []; + changeStream.onResume = cb => onResume.push(cb); // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => - changeStream.cursor.removeAllListeners(event) - ); + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); // close internal cursor, ignore errors - changeStream.cursor.close(); + cursor.close(); // attempt recreating the cursor - if (eventEmitter) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { + return waitForTopologyConnected( + changeStream.topology, + { readPreference: cursor.options.readPreference }, + err => { if (err) { - changeStream.emit('error', err); - changeStream.emit('close'); - return; + if (eventEmitter) { + changeStream.emit('error', err); + changeStream.emit('close'); + return; + } + onResume.forEach(cb => cb(err)); + return callback(err); } changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - }); - - return; - } - - if (callback) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return callback(err, null); - - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - changeStream.next(callback); - }); - - return; - } - - return new Promise((resolve, reject) => { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return reject(err); - resolve(); - }); - }) - .then( - () => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions)) - ) - .then(() => changeStream.next()); + onResume.forEach(cb => cb()); + if (!eventEmitter) changeStream.next(callback); + } + ); } if (eventEmitter) return changeStream.emit('error', error); - if (typeof callback === 'function') return callback(error, null); - return changeStream.promiseLibrary.reject(error); + return callback(error, null); } - changeStream.attemptingResume = false; - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); if (eventEmitter) return changeStream.emit('error', noResumeTokenError); - if (typeof callback === 'function') return callback(noResumeTokenError, null); - return changeStream.promiseLibrary.reject(noResumeTokenError); + return callback(noResumeTokenError, null); } // cache the resume token @@ -561,8 +541,17 @@ function processNewChange(args) { // Return the change if (eventEmitter) return changeStream.emit('change', change); - if (typeof callback === 'function') return callback(error, change); - return changeStream.promiseLibrary.resolve(change); + return callback(error, change); +} + +function getCursor(changeStream, callback) { + if (!changeStream.cursor) { + return changeStream.onResume(err => { + if (err) return callback(err); + getCursor(changeStream, callback); + }); + } + return callback(null, changeStream.cursor); } /** From f106a5717d41f6ed5ee44c7480de4fef4e0c271b Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 10:52:48 -0400 Subject: [PATCH 05/20] test: extract withChangeStream helper --- test/functional/change_stream.test.js | 144 +++++++++++++------------- 1 file changed, 70 insertions(+), 74 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 6c9612adf48..ac6d93333c3 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2922,107 +2922,103 @@ describe('Change Streams', function() { }); describe('Change Stream Resume Error Tests', function() { - it('(events) should continue iterating after a resumable error', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { + function withChangeStream(callback) { + return function(done) { const configuration = this.configuration; const client = configuration.newClient(); client.connect(err => { expect(err).to.not.exist; - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - const docs = []; - changeStream.on('change', change => { - expect(change).to.exist; - docs.push(change); - if (docs.length === 2) { - expect(docs[0]).to.containSubset({ - operationType: 'insert', - fullDocument: { a: 42 } - }); - expect(docs[1]).to.containSubset({ - operationType: 'insert', - fullDocument: { b: 24 } - }); - changeStream.close(() => client.close(done)); - } + const db = client.db('test'); + db.createCollection('changeStreamResumeErrorTest', (err, collection) => { + expect(err).to.not.exist; + const changeStream = collection.watch(); + callback(collection, changeStream, () => + changeStream.close(() => collection.drop(() => client.close(done))) + ); }); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; - }); + }); + }; + } + it('(events) should continue iterating after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; }); }); }); }); - } + }) }); it('(callback) hasNext should work after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - changeStream.hasNext((err1, hasNext) => { - expect(err1).to.not.exist; - expect(hasNext).to.be.true; - changeStream.close(() => client.close(done)); - }); + test: withChangeStream((collection, changeStream, done) => { + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err1).to.not.exist; + expect(hasNext).to.be.true; + done(); }); }); }); - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; - }); }); - } + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + }); + }) }); it('(callback) should continue iterating after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - const collection = client.db().collection('test'); - const changeStream = collection.watch(); - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - changeStream.hasNext((err, hasNext) => { + test: withChangeStream((collection, changeStream, done) => { + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { expect(err).to.not.exist; - expect(hasNext).to.be.true; - changeStream.next((err, change) => { - expect(err).to.not.exist; - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { a: 42 } - }); - changeStream.close(() => client.close(done)); + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } }); + done(); }); }); }); }); - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; - }); }); - } + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + }); + }) }); }); From b61b3a8d72773dcf33ad28f0260d183f9138b323 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 10:53:37 -0400 Subject: [PATCH 06/20] test: skip callback resume error tests --- test/functional/change_stream.test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index ac6d93333c3..751560380a9 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2971,7 +2971,7 @@ describe('Change Stream Resume Error Tests', function() { }) }); - it('(callback) hasNext should work after a resumable error', { + it.skip('(callback) hasNext should work after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { @@ -2993,7 +2993,7 @@ describe('Change Stream Resume Error Tests', function() { }) }); - it('(callback) should continue iterating after a resumable error', { + it.skip('(callback) should continue iterating after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { From 2f3c0c1cc292fe20f646c8d58a437e9e2b91c55e Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 11:33:21 -0400 Subject: [PATCH 07/20] test: consolidate tests --- test/functional/change_stream.test.js | 34 ++++++++------------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 751560380a9..dd60290354a 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2971,7 +2971,7 @@ describe('Change Stream Resume Error Tests', function() { }) }); - it.skip('(callback) hasNext should work after a resumable error', { + it('(callback) hasNext and next should work after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { @@ -2981,43 +2981,29 @@ describe('Change Stream Resume Error Tests', function() { changeStream.hasNext((err1, hasNext) => { expect(err1).to.not.exist; expect(hasNext).to.be.true; - done(); - }); - }); - }); - }); - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; - }); - }) - }); - - it.skip('(callback) should continue iterating after a resumable error', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withChangeStream((collection, changeStream, done) => { - waitForStarted(changeStream, () => { - collection.insertOne({ a: 42 }, err => { - expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; changeStream.next((err, change) => { expect(err).to.not.exist; expect(change).to.containSubset({ operationType: 'insert', - fullDocument: { a: 42 } + fullDocument: { b: 24 } }); done(); }); }); + collection.insertOne({ b: 24 }); }); }); }); changeStream.hasNext((err, hasNext) => { expect(err).to.not.exist; expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + }); }); }) }); From 48eff06d67c5d476b937af8c4e4f2489f1b1de81 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 11:51:18 -0400 Subject: [PATCH 08/20] test: remove unused helpers to fix lint --- test/functional/change_stream.test.js | 60 --------------------------- 1 file changed, 60 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index dd60290354a..4c1e7a0cb40 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -46,66 +46,6 @@ function waitForStarted(changeStream, callback) { }); } -/** - * Iterates the next discrete batch of a change stream non-eagerly. This - * will return `null` if the next bach is empty, rather than waiting forever - * for a non-empty batch. - * - * @param {ChangeStream} changeStream - * @param {Function} callback - */ -function tryNext(changeStream, callback) { - let complete = false; - function done(err, result) { - if (complete) return; - - // if the arity is 1 then this a callback for `more` - if (arguments.length === 1) { - result = err; - const batch = result.cursor.firstBatch || result.cursor.nextBatch; - if (batch.length === 0) { - complete = true; - callback(null, null); - } - - return; - } - - // otherwise, this a normal response to `next` - complete = true; - changeStream.removeListener('more', done); - if (err) return callback(err); - callback(err, result); - } - - // race the two requests - changeStream.next(done); - changeStream.cursor.once('more', done); -} - -/** - * Exhausts a change stream aggregating all responses until the first - * empty batch into a returned array of events. - * - * @param {ChangeStream} changeStream - * @param {Function|Array} bag - * @param {Function} [callback] - */ -function exhaust(changeStream, bag, callback) { - if (typeof bag === 'function') { - callback = bag; - bag = []; - } - - tryNext(changeStream, (err, doc) => { - if (err) return callback(err); - if (doc === null) return callback(undefined, bag); - - bag.push(doc); - exhaust(changeStream, bag, callback); - }); -} - // Define the pipeline processing changes var pipeline = [ { $addFields: { addedField: 'This is a field added using $addFields' } }, From dc820319d4b98ed70626b80923d9c85a1c396375 Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 14:03:34 -0400 Subject: [PATCH 09/20] fix: NonRetryableChangeStreamError should be NonResumableChangeStreamError --- lib/error.js | 2 +- test/functional/change_stream.test.js | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/error.js b/lib/error.js index 4d104e9be8e..912ce417ed4 100644 --- a/lib/error.js +++ b/lib/error.js @@ -38,7 +38,7 @@ function isResumableError(error) { return !( GET_MORE_NON_RESUMABLE_CODES.has(error.code) || - error.hasErrorLabel('NonRetryableChangeStreamError') + error.hasErrorLabel('NonResumableChangeStreamError') ); } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 4c1e7a0cb40..20e0f389bf7 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1825,7 +1825,7 @@ describe('Change Streams', function() { } }); - it('should not resume when error includes error label NonRetryableChangeStreamError', function() { + it('should not resume when error includes error label NonResumableChangeStreamError', function() { let server; let client; let changeStream; @@ -1873,7 +1873,7 @@ describe('Change Streams', function() { getMoreCount += 1; request.reply({ ok: 0, - errorLabels: ['NonRetryableChangeStreamError'] + errorLabels: ['NonResumableChangeStreamError'] }); } else { getMoreCount += 1; @@ -1916,7 +1916,7 @@ describe('Change Streams', function() { () => Promise.reject('Expected changeStream to not resume'), err => { expect(err).to.be.an.instanceOf(MongoError); - expect(err.hasErrorLabel('NonRetryableChangeStreamError')).to.be.true; + expect(err.hasErrorLabel('NonResumableChangeStreamError')).to.be.true; expect(aggregateCount).to.equal(1); expect(getMoreCount).to.equal(1); } From 55d17d2724b282897120d835f339c7226ca3768e Mon Sep 17 00:00:00 2001 From: emadum Date: Sun, 26 Apr 2020 17:39:00 -0400 Subject: [PATCH 10/20] test: fix onResume in eventEmitter case --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 59e1af0736f..a9b70e566bc 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -500,12 +500,12 @@ function processNewChange(args) { { readPreference: cursor.options.readPreference }, err => { if (err) { + onResume.forEach(cb => cb(err)); if (eventEmitter) { changeStream.emit('error', err); changeStream.emit('close'); return; } - onResume.forEach(cb => cb(err)); return callback(err); } changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); From 5d0efeba1f45dfe799cbc62d1961aaf67ea6f502 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 29 Apr 2020 11:26:27 -0400 Subject: [PATCH 11/20] refactor: move waitForTopology into getCursor --- lib/change_stream.js | 74 +++++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index a9b70e566bc..9cb0004cd5a 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,5 +1,6 @@ 'use strict'; +const Denque = require('denque'); const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; @@ -8,6 +9,7 @@ const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); +const kResumeQueue = Symbol('resumeQueue'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( @@ -91,6 +93,8 @@ class ChangeStream extends EventEmitter { this.options.readPreference = parent.s.readPreference; } + this[kResumeQueue] = new Denque(); + // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); @@ -147,9 +151,6 @@ class ChangeStream extends EventEmitter { return maybePromise(this.parent, callback, cb => getCursor(this, (err, cursor) => { if (err) return cb(err); - if (this.isClosed()) { - return cb(new Error('Change Stream is not open.')); - } cursor.next((error, change) => { processNewChange({ changeStream: this, error, change, callback: cb }); }); @@ -484,10 +485,6 @@ function processNewChange(args) { if (error) { if (isResumableError(error) && changeStream.cursor) { - changeStream.cursor = null; - const onResume = []; - changeStream.onResume = cb => onResume.push(cb); - // stop listening to all events from old cursor ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); @@ -495,24 +492,23 @@ function processNewChange(args) { cursor.close(); // attempt recreating the cursor - return waitForTopologyConnected( - changeStream.topology, - { readPreference: cursor.options.readPreference }, + changeStream.cursor = null; + getCursor( + changeStream, err => { - if (err) { - onResume.forEach(cb => cb(err)); - if (eventEmitter) { + if (eventEmitter) { + if (err) { changeStream.emit('error', err); changeStream.emit('close'); return; } - return callback(err); + return; } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - onResume.forEach(cb => cb()); - if (!eventEmitter) changeStream.next(callback); - } + changeStream.next(callback); + }, + cursor ); + return; } if (eventEmitter) return changeStream.emit('error', error); @@ -544,14 +540,42 @@ function processNewChange(args) { return callback(error, change); } -function getCursor(changeStream, callback) { - if (!changeStream.cursor) { - return changeStream.onResume(err => { - if (err) return callback(err); - getCursor(changeStream, callback); - }); +function getCursor(changeStream, callback, oldCursor) { + if (changeStream.cursor) { + if (changeStream.isClosed()) { + callback(new MongoError('Change Stream is not open.')); + return; + } + callback(undefined, changeStream.cursor); + return; + } + + changeStream[kResumeQueue].push(callback); + if (changeStream.waitingForTopology || !oldCursor) { + return; + } + + changeStream.waitingForTopology = true; + waitForTopologyConnected( + changeStream.topology, + { readPreference: oldCursor.options.readPreference }, + err => { + changeStream.waitingForTopology = false; + if (err) { + processResumeQueue(changeStream, err); + return; + } + changeStream.cursor = createChangeStreamCursor(changeStream, oldCursor.resumeOptions); + processResumeQueue(changeStream, undefined, changeStream.cursor); + } + ); +} + +function processResumeQueue(changeStream, err, cursor) { + while (changeStream[kResumeQueue].length) { + const request = changeStream[kResumeQueue].pop(); + request(err, cursor); } - return callback(null, changeStream.cursor); } /** From 382158bea14000577ef6a8e77428418fecfd9d1d Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 29 Apr 2020 11:40:13 -0400 Subject: [PATCH 12/20] cleanup --- lib/change_stream.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 9cb0004cd5a..34531a5935e 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -151,9 +151,9 @@ class ChangeStream extends EventEmitter { return maybePromise(this.parent, callback, cb => getCursor(this, (err, cursor) => { if (err) return cb(err); - cursor.next((error, change) => { - processNewChange({ changeStream: this, error, change, callback: cb }); - }); + cursor.next((error, change) => + processNewChange({ changeStream: this, error, change, callback: cb }) + ); }) ); } @@ -484,7 +484,9 @@ function processNewChange(args) { const cursor = changeStream.cursor; if (error) { - if (isResumableError(error) && changeStream.cursor) { + if (isResumableError(error)) { + changeStream.cursor = null; + // stop listening to all events from old cursor ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); @@ -492,7 +494,6 @@ function processNewChange(args) { cursor.close(); // attempt recreating the cursor - changeStream.cursor = null; getCursor( changeStream, err => { @@ -500,7 +501,6 @@ function processNewChange(args) { if (err) { changeStream.emit('error', err); changeStream.emit('close'); - return; } return; } From 771af477eae21224276b9867df47c702e1dc1888 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 29 Apr 2020 14:23:50 -0400 Subject: [PATCH 13/20] add comments --- lib/change_stream.js | 24 +++++++++++++++++++++++- test/functional/change_stream.test.js | 4 ++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 34531a5935e..db78ecde639 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -485,6 +485,7 @@ function processNewChange(args) { if (error) { if (isResumableError(error)) { + // set the cursor to null so getCursor returns a fresh cursor changeStream.cursor = null; // stop listening to all events from old cursor @@ -506,6 +507,7 @@ function processNewChange(args) { } changeStream.next(callback); }, + // provide the old cursor so the options can be persisted cursor ); return; @@ -540,7 +542,15 @@ function processNewChange(args) { return callback(error, change); } +/** + * Safely provides a cursor across resume attempts + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {function} callback gets the cursor or error + * @param {?ChangeStreamCursor} oldCursor when resuming from an error, carry over options from previous cursor + */ function getCursor(changeStream, callback, oldCursor) { + // if a cursor exists and it is open, return it if (changeStream.cursor) { if (changeStream.isClosed()) { callback(new MongoError('Change Stream is not open.')); @@ -550,11 +560,16 @@ function getCursor(changeStream, callback, oldCursor) { return; } + // no cursor, queue callback until topology reconnects changeStream[kResumeQueue].push(callback); - if (changeStream.waitingForTopology || !oldCursor) { + if ( + changeStream.waitingForTopology || // don't reconnect if already reconnecting + !oldCursor // only reconnect if the previous cursor was provided + ) { return; } + // attempt to reconnect the topology changeStream.waitingForTopology = true; waitForTopologyConnected( changeStream.topology, @@ -571,6 +586,13 @@ function getCursor(changeStream, callback, oldCursor) { ); } +/** + * Drain the resume queue when a new has become available + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {?Error} err error getting a new cursor + * @param {?ChangeStreamCursor} cursor the new cursor + */ function processResumeQueue(changeStream, err, cursor) { while (changeStream[kResumeQueue].length) { const request = changeStream[kResumeQueue].pop(); diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 20e0f389bf7..cee097c177c 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2918,8 +2918,8 @@ describe('Change Stream Resume Error Tests', function() { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; triggerResumableError(changeStream, () => { - changeStream.hasNext((err1, hasNext) => { - expect(err1).to.not.exist; + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; expect(hasNext).to.be.true; changeStream.next((err, change) => { expect(err).to.not.exist; From a5a34f06cd28ccfe63a61000037600efc9da12c3 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 29 Apr 2020 14:45:13 -0400 Subject: [PATCH 14/20] jsdoc cleanup --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index db78ecde639..b8baf0a9fd4 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -547,7 +547,7 @@ function processNewChange(args) { * * @param {ChangeStream} changeStream the parent ChangeStream * @param {function} callback gets the cursor or error - * @param {?ChangeStreamCursor} oldCursor when resuming from an error, carry over options from previous cursor + * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ function getCursor(changeStream, callback, oldCursor) { // if a cursor exists and it is open, return it From 8c84bf534ce7829fae5dfa35c8719da51a0910f9 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 15:34:20 -0400 Subject: [PATCH 15/20] test: test for both old versions of non-resumable label to be safe --- test/functional/change_stream.test.js | 171 +++++++++++++------------- 1 file changed, 87 insertions(+), 84 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 12e45e6c5c4..c7b52430129 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -3,7 +3,6 @@ var assert = require('assert'); var Transform = require('stream').Transform; const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; -var mongoErrorContextSymbol = require('../../lib/core').mongoErrorContextSymbol; var setupDatabase = require('./shared').setupDatabase; var delay = require('./shared').delay; var co = require('co'); @@ -1843,104 +1842,108 @@ describe('Change Streams', function() { } }); - it('should not resume when error includes error label NonResumableChangeStreamError', function() { - let server; - let client; - let changeStream; + const oldLabelTest = label => + it(`should not resume when error includes error label ${label}`, function() { + let server; + let client; + let changeStream; - function teardown(e) { - return Promise.resolve() - .then(() => changeStream && changeStream.close()) - .catch(() => {}) - .then(() => client && client.close()) - .catch(() => {}) - .then(() => e && Promise.reject(e)); - } + function teardown(e) { + return Promise.resolve() + .then(() => changeStream && changeStream.close()) + .catch(() => {}) + .then(() => client && client.close()) + .catch(() => {}) + .then(() => e && Promise.reject(e)); + } - const db = 'foobar'; - const coll = 'foobar'; - const ns = `${db}.${coll}`; + const db = 'foobar'; + const coll = 'foobar'; + const ns = `${db}.${coll}`; - let aggregateCount = 0; - let getMoreCount = 0; + let aggregateCount = 0; + let getMoreCount = 0; - function messageHandler(request) { - const doc = request.document; + function messageHandler(request) { + const doc = request.document; - if (doc.ismaster) { - request.reply( - Object.assign({}, mock.DEFAULT_ISMASTER_36, { - ismaster: true, - secondary: false, - me: server.uri(), - primary: server.uri() - }) - ); - } else if (doc.aggregate) { - aggregateCount += 1; - request.reply({ - ok: 1, - cursor: { - firstBatch: [], - id: 1, - ns - } - }); - } else if (doc.getMore) { - if (getMoreCount === 0) { - getMoreCount += 1; - request.reply({ - ok: 0, - errorLabels: ['NonResumableChangeStreamError'] - }); - } else { - getMoreCount += 1; + if (doc.ismaster) { + request.reply( + Object.assign({}, mock.DEFAULT_ISMASTER_36, { + ismaster: true, + secondary: false, + me: server.uri(), + primary: server.uri() + }) + ); + } else if (doc.aggregate) { + aggregateCount += 1; request.reply({ ok: 1, cursor: { - nextBatch: [ - { - _id: {}, - operationType: 'insert', - ns: { db, coll }, - fullDocument: { a: 1 } - } - ], + firstBatch: [], id: 1, ns } }); + } else if (doc.getMore) { + if (getMoreCount === 0) { + getMoreCount += 1; + request.reply({ + ok: 0, + errorLabels: [label] + }); + } else { + getMoreCount += 1; + request.reply({ + ok: 1, + cursor: { + nextBatch: [ + { + _id: {}, + operationType: 'insert', + ns: { db, coll }, + fullDocument: { a: 1 } + } + ], + id: 1, + ns + } + }); + } + } else { + request.reply({ ok: 1 }); } - } else { - request.reply({ ok: 1 }); } - } - return mock - .createServer() - .then(_server => (server = _server)) - .then(() => server.setMessageHandler(messageHandler)) - .then(() => (client = this.configuration.newClient(`mongodb://${server.uri()}`))) - .then(() => client.connect()) - .then( - () => - (changeStream = client - .db(db) - .collection(coll) - .watch()) - ) - .then(() => changeStream.next()) - .then( - () => Promise.reject('Expected changeStream to not resume'), - err => { - expect(err).to.be.an.instanceOf(MongoError); - expect(err.hasErrorLabel('NonResumableChangeStreamError')).to.be.true; - expect(aggregateCount).to.equal(1); - expect(getMoreCount).to.equal(1); - } - ) - .then(() => teardown(), teardown); - }); + return mock + .createServer() + .then(_server => (server = _server)) + .then(() => server.setMessageHandler(messageHandler)) + .then(() => (client = this.configuration.newClient(`mongodb://${server.uri()}`))) + .then(() => client.connect()) + .then( + () => + (changeStream = client + .db(db) + .collection(coll) + .watch()) + ) + .then(() => changeStream.next()) + .then( + () => Promise.reject('Expected changeStream to not resume'), + err => { + expect(err).to.be.an.instanceOf(MongoError); + expect(err.hasErrorLabel(label)).to.be.true; + expect(aggregateCount).to.equal(1); + expect(getMoreCount).to.equal(1); + } + ) + .then(() => teardown(), teardown); + }); + ['NonResumableChangeStreamError', 'NonResumableChangeStreamError'].forEach(label => + oldLabelTest(label) + ); it('should emit close event after error event', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, From a5224c6b7ec533092b98ed1e00d43e66e4facacd Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:18:06 -0400 Subject: [PATCH 16/20] refactor: use maybePromise for ChangeStream.close --- lib/change_stream.js | 46 +++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index f744921ff08..e3d053bdcab 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -98,6 +98,8 @@ class ChangeStream extends EventEmitter { // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); + this.closed = false; + // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { @@ -167,7 +169,7 @@ class ChangeStream extends EventEmitter { if (this.cursor) { return this.cursor.isClosed(); } - return true; + return this.closed; } /** @@ -177,31 +179,21 @@ class ChangeStream extends EventEmitter { * @return {Promise} returns Promise if no callback passed */ close(callback) { - if (!this.cursor) { - if (callback) return callback(); - return this.promiseLibrary.resolve(); - } - - // Tidy up the existing cursor - const cursor = this.cursor; + return maybePromise(this.parent, callback, cb => { + this.closed = true; - if (callback) { - return cursor.close(err => { - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + if (!this.cursor) { + return cb(); + } - return callback(err); - }); - } + // Tidy up the existing cursor + const cursor = this.cursor; - const PromiseCtor = this.promiseLibrary || Promise; - return new PromiseCtor((resolve, reject) => { - cursor.close(err => { + return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); delete this.cursor; - if (err) return reject(err); - resolve(); + return cb(err); }); }); } @@ -556,12 +548,13 @@ function processNewChange(args) { * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ function getCursor(changeStream, callback, oldCursor) { + if (changeStream.isClosed()) { + callback(new MongoError('ChangeStream is closed.')); + return; + } + // if a cursor exists and it is open, return it if (changeStream.cursor) { - if (changeStream.isClosed()) { - callback(new MongoError('Change Stream is not open.')); - return; - } callback(undefined, changeStream.cursor); return; } @@ -583,6 +576,7 @@ function getCursor(changeStream, callback, oldCursor) { err => { changeStream.waitingForTopology = false; if (err) { + changeStream.closed = true; processResumeQueue(changeStream, err); return; } @@ -601,6 +595,10 @@ function getCursor(changeStream, callback, oldCursor) { */ function processResumeQueue(changeStream, err, cursor) { while (changeStream[kResumeQueue].length) { + if (changeStream.isClosed()) { + request(new MongoError('Change Stream is not open.')); + return; + } const request = changeStream[kResumeQueue].pop(); request(err, cursor); } From a5ba416affa33a5c2ff0d2bf444f0abfec12c690 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:18:23 -0400 Subject: [PATCH 17/20] test: fix skipped mid-close tests --- test/functional/change_stream.test.js | 31 ++++++++++++++------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index c7b52430129..4165f8a409f 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1988,7 +1988,7 @@ describe('Change Streams', function() { }); // TODO: re-enable/fix these tests in NODE-2548 - describe.skip('should properly handle a changeStream event being processed mid-close', function() { + describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll; function write() { @@ -2024,15 +2024,14 @@ describe('Change Streams', function() { .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { - const nextP = changeStream.next(); - - return changeStream.close().then(() => nextP); + const nextP = () => changeStream.next(); + return changeStream.close().then(() => nextP()); }); } return Promise.all([read(), write()]).then( () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') + err => expect(err.message).to.equal('ChangeStream is closed.') ); } }); @@ -2044,17 +2043,19 @@ describe('Change Streams', function() { changeStream.next(() => { changeStream.next(() => { - changeStream.next(err => { - let _err = null; - try { - expect(err.message).to.equal('ChangeStream is closed'); - } catch (e) { - _err = e; - } finally { - done(_err); - } + changeStream.close(err => { + expect(err).to.not.exist; + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed.'); + } catch (e) { + _err = e; + } finally { + done(_err); + } + }); }); - changeStream.close(); }); }); From faf2a5a163b30138ef0fe2d94766d60d5f17f0c5 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:39:07 -0400 Subject: [PATCH 18/20] test: skip change stream tests running on mock single topology --- test/functional/change_stream.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 4165f8a409f..075a8fd1c56 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -788,7 +788,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using promises', { + it.skip('Should return MongoNetworkError after first retry attempt fails using promises', { metadata: { requires: { generators: true, @@ -886,7 +886,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using callbacks', { + it.skip('Should return MongoNetworkError after first retry attempt fails using callbacks', { metadata: { requires: { generators: true, @@ -981,7 +981,7 @@ describe('Change Streams', function() { } }); - it('Should resume Change Stream when a resumable error is encountered', { + it.skip('Should resume Change Stream when a resumable error is encountered', { metadata: { requires: { generators: true, From 5dd46db72cc2402e94080612e6c79e97d86d4e73 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 1 May 2020 08:20:39 -0400 Subject: [PATCH 19/20] test: fix broken tests --- lib/change_stream.js | 60 +++++++++++--------------- test/functional/change_stream.test.js | 61 ++++++++++++++++----------- 2 files changed, 60 insertions(+), 61 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index e3d053bdcab..15d2e5ea7ec 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -96,7 +96,7 @@ class ChangeStream extends EventEmitter { this[kResumeQueue] = new Denque(); // Create contained Change Stream cursor - this.cursor = createChangeStreamCursor(this, options); + this.cursor = createCursor(this, options); this.closed = false; @@ -368,7 +368,7 @@ class ChangeStreamCursor extends Cursor { */ // Create a new change stream cursor based on self's configuration -function createChangeStreamCursor(self, options) { +function createCursor(self, options) { const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { @@ -480,14 +480,13 @@ function processNewChange(args) { // If the cursor is null, then it should not process a change. if (cursor == null) { - // We do not error in the eventEmitter case. - if (eventEmitter) return; + // do not error in the eventEmitter case or if the change stream has closed due to error + if (eventEmitter || changeStream.closed) return; return callback(new MongoError('ChangeStream is closed')); } if (error) { if (isResumableError(error, maxWireVersion(cursor.server))) { - // set the cursor to null so getCursor returns a fresh cursor changeStream.cursor = null; // stop listening to all events from old cursor @@ -497,21 +496,18 @@ function processNewChange(args) { cursor.close(); // attempt recreating the cursor - getCursor( - changeStream, - err => { + recreateCursor(changeStream, cursor, err => { + if (err) { + changeStream.closed = true; if (eventEmitter) { - if (err) { - changeStream.emit('error', err); - changeStream.emit('close'); - } - return; + changeStream.emit('error', err); + changeStream.emit('close'); } - changeStream.next(callback); - }, - // provide the old cursor so the options can be persisted - cursor - ); + return processResumeQueue(changeStream, err); + } + processResumeQueue(changeStream); + if (!eventEmitter) changeStream.next(callback); + }); return; } @@ -547,7 +543,7 @@ function processNewChange(args) { * @param {function} callback gets the cursor or error * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ -function getCursor(changeStream, callback, oldCursor) { +function getCursor(changeStream, callback) { if (changeStream.isClosed()) { callback(new MongoError('ChangeStream is closed.')); return; @@ -561,27 +557,19 @@ function getCursor(changeStream, callback, oldCursor) { // no cursor, queue callback until topology reconnects changeStream[kResumeQueue].push(callback); - if ( - changeStream.waitingForTopology || // don't reconnect if already reconnecting - !oldCursor // only reconnect if the previous cursor was provided - ) { - return; - } +} +function recreateCursor(changeStream, cursor, cb) { // attempt to reconnect the topology changeStream.waitingForTopology = true; waitForTopologyConnected( changeStream.topology, - { readPreference: oldCursor.options.readPreference }, + { readPreference: cursor.options.readPreference }, err => { changeStream.waitingForTopology = false; - if (err) { - changeStream.closed = true; - processResumeQueue(changeStream, err); - return; - } - changeStream.cursor = createChangeStreamCursor(changeStream, oldCursor.resumeOptions); - processResumeQueue(changeStream, undefined, changeStream.cursor); + if (err) return cb(err); + changeStream.cursor = createCursor(changeStream, cursor.resumeOptions); + cb(); } ); } @@ -590,17 +578,17 @@ function getCursor(changeStream, callback, oldCursor) { * Drain the resume queue when a new has become available * * @param {ChangeStream} changeStream the parent ChangeStream + * * @param {?ChangeStreamCursor} changeStream.cursor the new cursor * @param {?Error} err error getting a new cursor - * @param {?ChangeStreamCursor} cursor the new cursor */ -function processResumeQueue(changeStream, err, cursor) { +function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { if (changeStream.isClosed()) { request(new MongoError('Change Stream is not open.')); return; } const request = changeStream[kResumeQueue].pop(); - request(err, cursor); + request(err, changeStream.cursor); } } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 075a8fd1c56..e66ad8a3faf 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -18,8 +18,9 @@ chai.use(require('chai-subset')); * * @param {ChangeStream} changeStream * @param {Function} onCursorClosed callback when cursor closed due this error + * @param {number} [delay] optional delay before triggering error */ -function triggerResumableError(changeStream, onCursorClosed) { +function triggerResumableError(changeStream, onCursorClosed, delay) { const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { closeCursor(err => { @@ -29,7 +30,11 @@ function triggerResumableError(changeStream, onCursorClosed) { }; const fakeResumableError = new MongoNetworkError('fake error'); // delay error slightly to better simulate real conditions - setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), 250); + if (delay) { + setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); + } else { + changeStream.cursor.emit('error', fakeResumableError); + } } /** @@ -1941,7 +1946,7 @@ describe('Change Streams', function() { ) .then(() => teardown(), teardown); }); - ['NonResumableChangeStreamError', 'NonResumableChangeStreamError'].forEach(label => + ['NonRetryableChangeStreamError', 'NonResumableChangeStreamError'].forEach(label => oldLabelTest(label) ); @@ -1987,7 +1992,6 @@ describe('Change Streams', function() { } }); - // TODO: re-enable/fix these tests in NODE-2548 describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll; @@ -2779,12 +2783,16 @@ describe('Change Streams', function() { }); waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); - }); + triggerResumableError( + changeStream, + () => { + events.push('error'); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { + expect(err).to.not.exist; + }); + }, + 250 + ); }); } }); @@ -2794,8 +2802,7 @@ describe('Change Streams', function() { // - MUST include a resumeAfter option // - MUST NOT include a startAfter option // when resuming a change stream. - // TODO: unskip before merging - it.skip('$changeStream that has received results must include resumeAfter and not startAfter', { + it('$changeStream that has received results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { let events = []; @@ -2808,7 +2815,7 @@ describe('Change Streams', function() { case 2: // only events after this point are relevant to this test events = []; - triggerResumableError(changeStream, () => events.push('error')); + triggerResumableError(changeStream, () => events.push('error'), 0); break; case 3: expect(events) @@ -2890,21 +2897,25 @@ describe('Change Stream Resume Error Tests', function() { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - changeStream.hasNext((err, hasNext) => { - expect(err).to.not.exist; - expect(hasNext).to.be.true; - changeStream.next((err, change) => { + triggerResumableError( + changeStream, + () => { + changeStream.hasNext((err, hasNext) => { expect(err).to.not.exist; - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { b: 24 } + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); }); - done(); }); - }); - collection.insertOne({ b: 24 }); - }); + collection.insertOne({ b: 24 }); + }, + 250 + ); }); }); changeStream.hasNext((err, hasNext) => { From 37f6cb9f2a6ed083535453bb2763bcdfd96613b8 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 1 May 2020 08:58:43 -0400 Subject: [PATCH 20/20] test: unskip test --- test/functional/change_stream.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e66ad8a3faf..1a1c728d9f6 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -986,7 +986,7 @@ describe('Change Streams', function() { } }); - it.skip('Should resume Change Stream when a resumable error is encountered', { + it('Should resume Change Stream when a resumable error is encountered', { metadata: { requires: { generators: true,