diff --git a/.travis.yml b/.travis.yml index 1c19f603419..97348674a17 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ language: node_js branches: only: - master - - 3.6 + - next before_install: # we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells diff --git a/lib/change_stream.js b/lib/change_stream.js index 28e64d860bc..15d2e5ea7ec 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,5 +1,6 @@ 'use strict'; +const Denque = require('denque'); const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; @@ -8,6 +9,7 @@ const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); +const kResumeQueue = Symbol('resumeQueue'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( @@ -91,8 +93,12 @@ class ChangeStream extends EventEmitter { this.options.readPreference = parent.s.readPreference; } + this[kResumeQueue] = new Denque(); + // Create contained Change Stream cursor - this.cursor = createChangeStreamCursor(this, options); + this.cursor = createCursor(this, options); + + this.closed = false; // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { @@ -128,7 +134,12 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ hasNext(callback) { - return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb)); + return maybePromise(this.parent, callback, cb => + getCursor(this, (err, cursor) => { + if (err) return cb(err); + cursor.hasNext(cb); + }) + ); } /** @@ -139,14 +150,14 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ next(callback) { - return maybePromise(this.parent, callback, cb => { - if (this.isClosed()) { - return cb(new Error('Change Stream is not open.')); - } - this.cursor.next((error, change) => { - processNewChange({ changeStream: this, error, change, callback: cb }); - }); - }); + return maybePromise(this.parent, callback, cb => + getCursor(this, (err, cursor) => { + if (err) return cb(err); + cursor.next((error, change) => + processNewChange({ changeStream: this, error, change, callback: cb }) + ); + }) + ); } /** @@ -158,7 +169,7 @@ class ChangeStream extends EventEmitter { if (this.cursor) { return this.cursor.isClosed(); } - return true; + return this.closed; } /** @@ -168,31 +179,21 @@ class ChangeStream extends EventEmitter { * @return {Promise} returns Promise if no callback passed */ close(callback) { - if (!this.cursor) { - if (callback) return callback(); - return this.promiseLibrary.resolve(); - } - - // Tidy up the existing cursor - const cursor = this.cursor; + return maybePromise(this.parent, callback, cb => { + this.closed = true; - if (callback) { - return cursor.close(err => { - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - delete this.cursor; + if (!this.cursor) { + return cb(); + } - return callback(err); - }); - } + // Tidy up the existing cursor + const cursor = this.cursor; - const PromiseCtor = this.promiseLibrary || Promise; - return new PromiseCtor((resolve, reject) => { - cursor.close(err => { + return cursor.close(err => { ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); delete this.cursor; - if (err) return reject(err); - resolve(); + return cb(err); }); }); } @@ -367,7 +368,7 @@ class ChangeStreamCursor extends Cursor { */ // Create a new change stream cursor based on self's configuration -function createChangeStreamCursor(self, options) { +function createCursor(self, options) { const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { @@ -479,85 +480,48 @@ function processNewChange(args) { // If the cursor is null, then it should not process a change. if (cursor == null) { - // We do not error in the eventEmitter case. - if (eventEmitter) { - return; - } - - const error = new MongoError('ChangeStream is closed'); - return typeof callback === 'function' - ? callback(error, null) - : changeStream.promiseLibrary.reject(error); + // do not error in the eventEmitter case or if the change stream has closed due to error + if (eventEmitter || changeStream.closed) return; + return callback(new MongoError('ChangeStream is closed')); } - const topology = changeStream.topology; - const options = changeStream.cursor.options; - const wireVersion = maxWireVersion(cursor.server); - if (error) { - if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) { - changeStream.attemptingResume = true; + if (isResumableError(error, maxWireVersion(cursor.server))) { + changeStream.cursor = null; // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => - changeStream.cursor.removeAllListeners(event) - ); + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); // close internal cursor, ignore errors - changeStream.cursor.close(); + cursor.close(); // attempt recreating the cursor - if (eventEmitter) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) { + recreateCursor(changeStream, cursor, err => { + if (err) { + changeStream.closed = true; + if (eventEmitter) { changeStream.emit('error', err); changeStream.emit('close'); - return; } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - }); - - return; - } - - if (callback) { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return callback(err, null); - - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - changeStream.next(callback); - }); - - return; - } - - return new Promise((resolve, reject) => { - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) return reject(err); - resolve(); - }); - }) - .then( - () => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions)) - ) - .then(() => changeStream.next()); + return processResumeQueue(changeStream, err); + } + processResumeQueue(changeStream); + if (!eventEmitter) changeStream.next(callback); + }); + return; } if (eventEmitter) return changeStream.emit('error', error); - if (typeof callback === 'function') return callback(error, null); - return changeStream.promiseLibrary.reject(error); + return callback(error, null); } - changeStream.attemptingResume = false; - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); if (eventEmitter) return changeStream.emit('error', noResumeTokenError); - if (typeof callback === 'function') return callback(noResumeTokenError, null); - return changeStream.promiseLibrary.reject(noResumeTokenError); + return callback(noResumeTokenError, null); } // cache the resume token @@ -569,8 +533,63 @@ function processNewChange(args) { // Return the change if (eventEmitter) return changeStream.emit('change', change); - if (typeof callback === 'function') return callback(error, change); - return changeStream.promiseLibrary.resolve(change); + return callback(error, change); +} + +/** + * Safely provides a cursor across resume attempts + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {function} callback gets the cursor or error + * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor + */ +function getCursor(changeStream, callback) { + if (changeStream.isClosed()) { + callback(new MongoError('ChangeStream is closed.')); + return; + } + + // if a cursor exists and it is open, return it + if (changeStream.cursor) { + callback(undefined, changeStream.cursor); + return; + } + + // no cursor, queue callback until topology reconnects + changeStream[kResumeQueue].push(callback); +} + +function recreateCursor(changeStream, cursor, cb) { + // attempt to reconnect the topology + changeStream.waitingForTopology = true; + waitForTopologyConnected( + changeStream.topology, + { readPreference: cursor.options.readPreference }, + err => { + changeStream.waitingForTopology = false; + if (err) return cb(err); + changeStream.cursor = createCursor(changeStream, cursor.resumeOptions); + cb(); + } + ); +} + +/** + * Drain the resume queue when a new has become available + * + * @param {ChangeStream} changeStream the parent ChangeStream + * * @param {?ChangeStreamCursor} changeStream.cursor the new cursor + * @param {?Error} err error getting a new cursor + */ +function processResumeQueue(changeStream, err) { + while (changeStream[kResumeQueue].length) { + if (changeStream.isClosed()) { + request(new MongoError('Change Stream is not open.')); + return; + } + const request = changeStream[kResumeQueue].pop(); + request(err, changeStream.cursor); + } } /** diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 54df7c5ec9d..1a1c728d9f6 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -17,17 +17,24 @@ chai.use(require('chai-subset')); * Triggers a fake resumable error on a change stream * * @param {ChangeStream} changeStream - * @param {function} onCursorClosed callback when cursor closed due this error + * @param {Function} onCursorClosed callback when cursor closed due this error + * @param {number} [delay] optional delay before triggering error */ -function triggerResumableError(changeStream, onCursorClosed) { - const closeCursor = changeStream.cursor.close; +function triggerResumableError(changeStream, onCursorClosed, delay) { + const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { - onCursorClosed(); - changeStream.cursor.close = closeCursor; - changeStream.cursor.close(callback); + closeCursor(err => { + callback && callback(err); + onCursorClosed(); + }); }; const fakeResumableError = new MongoNetworkError('fake error'); - changeStream.cursor.emit('error', fakeResumableError); + // delay error slightly to better simulate real conditions + if (delay) { + setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); + } else { + changeStream.cursor.emit('error', fakeResumableError); + } } /** @@ -786,7 +793,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using promises', { + it.skip('Should return MongoNetworkError after first retry attempt fails using promises', { metadata: { requires: { generators: true, @@ -884,7 +891,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using callbacks', { + it.skip('Should return MongoNetworkError after first retry attempt fails using callbacks', { metadata: { requires: { generators: true, @@ -1840,104 +1847,108 @@ describe('Change Streams', function() { } }); - it('should not resume when error includes error label NonRetryableChangeStreamError', function() { - let server; - let client; - let changeStream; + const oldLabelTest = label => + it(`should not resume when error includes error label ${label}`, function() { + let server; + let client; + let changeStream; - function teardown(e) { - return Promise.resolve() - .then(() => changeStream && changeStream.close()) - .catch(() => {}) - .then(() => client && client.close()) - .catch(() => {}) - .then(() => e && Promise.reject(e)); - } + function teardown(e) { + return Promise.resolve() + .then(() => changeStream && changeStream.close()) + .catch(() => {}) + .then(() => client && client.close()) + .catch(() => {}) + .then(() => e && Promise.reject(e)); + } - const db = 'foobar'; - const coll = 'foobar'; - const ns = `${db}.${coll}`; + const db = 'foobar'; + const coll = 'foobar'; + const ns = `${db}.${coll}`; - let aggregateCount = 0; - let getMoreCount = 0; + let aggregateCount = 0; + let getMoreCount = 0; - function messageHandler(request) { - const doc = request.document; + function messageHandler(request) { + const doc = request.document; - if (doc.ismaster) { - request.reply( - Object.assign({}, mock.DEFAULT_ISMASTER_36, { - ismaster: true, - secondary: false, - me: server.uri(), - primary: server.uri() - }) - ); - } else if (doc.aggregate) { - aggregateCount += 1; - request.reply({ - ok: 1, - cursor: { - firstBatch: [], - id: 1, - ns - } - }); - } else if (doc.getMore) { - if (getMoreCount === 0) { - getMoreCount += 1; - request.reply({ - ok: 0, - errorLabels: ['NonRetryableChangeStreamError'] - }); - } else { - getMoreCount += 1; + if (doc.ismaster) { + request.reply( + Object.assign({}, mock.DEFAULT_ISMASTER_36, { + ismaster: true, + secondary: false, + me: server.uri(), + primary: server.uri() + }) + ); + } else if (doc.aggregate) { + aggregateCount += 1; request.reply({ ok: 1, cursor: { - nextBatch: [ - { - _id: {}, - operationType: 'insert', - ns: { db, coll }, - fullDocument: { a: 1 } - } - ], + firstBatch: [], id: 1, ns } }); + } else if (doc.getMore) { + if (getMoreCount === 0) { + getMoreCount += 1; + request.reply({ + ok: 0, + errorLabels: [label] + }); + } else { + getMoreCount += 1; + request.reply({ + ok: 1, + cursor: { + nextBatch: [ + { + _id: {}, + operationType: 'insert', + ns: { db, coll }, + fullDocument: { a: 1 } + } + ], + id: 1, + ns + } + }); + } + } else { + request.reply({ ok: 1 }); } - } else { - request.reply({ ok: 1 }); } - } - return mock - .createServer() - .then(_server => (server = _server)) - .then(() => server.setMessageHandler(messageHandler)) - .then(() => (client = this.configuration.newClient(`mongodb://${server.uri()}`))) - .then(() => client.connect()) - .then( - () => - (changeStream = client - .db(db) - .collection(coll) - .watch()) - ) - .then(() => changeStream.next()) - .then( - () => Promise.reject('Expected changeStream to not resume'), - err => { - expect(err).to.be.an.instanceOf(MongoError); - expect(err.hasErrorLabel('NonRetryableChangeStreamError')).to.be.true; - expect(aggregateCount).to.equal(1); - expect(getMoreCount).to.equal(1); - } - ) - .then(() => teardown(), teardown); - }); + return mock + .createServer() + .then(_server => (server = _server)) + .then(() => server.setMessageHandler(messageHandler)) + .then(() => (client = this.configuration.newClient(`mongodb://${server.uri()}`))) + .then(() => client.connect()) + .then( + () => + (changeStream = client + .db(db) + .collection(coll) + .watch()) + ) + .then(() => changeStream.next()) + .then( + () => Promise.reject('Expected changeStream to not resume'), + err => { + expect(err).to.be.an.instanceOf(MongoError); + expect(err.hasErrorLabel(label)).to.be.true; + expect(aggregateCount).to.equal(1); + expect(getMoreCount).to.equal(1); + } + ) + .then(() => teardown(), teardown); + }); + ['NonRetryableChangeStreamError', 'NonResumableChangeStreamError'].forEach(label => + oldLabelTest(label) + ); it('should emit close event after error event', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, @@ -1981,8 +1992,7 @@ describe('Change Streams', function() { } }); - // TODO: re-enable/fix these tests in NODE-2548 - describe.skip('should properly handle a changeStream event being processed mid-close', function() { + describe('should properly handle a changeStream event being processed mid-close', function() { let client, coll; function write() { @@ -2018,15 +2028,14 @@ describe('Change Streams', function() { .then(() => changeStream.next()) .then(() => changeStream.next()) .then(() => { - const nextP = changeStream.next(); - - return changeStream.close().then(() => nextP); + const nextP = () => changeStream.next(); + return changeStream.close().then(() => nextP()); }); } return Promise.all([read(), write()]).then( () => Promise.reject(new Error('Expected operation to fail with error')), - err => expect(err.message).to.equal('ChangeStream is closed') + err => expect(err.message).to.equal('ChangeStream is closed.') ); } }); @@ -2038,17 +2047,19 @@ describe('Change Streams', function() { changeStream.next(() => { changeStream.next(() => { - changeStream.next(err => { - let _err = null; - try { - expect(err.message).to.equal('ChangeStream is closed'); - } catch (e) { - _err = e; - } finally { - done(_err); - } + changeStream.close(err => { + expect(err).to.not.exist; + changeStream.next(err => { + let _err = null; + try { + expect(err.message).to.equal('ChangeStream is closed.'); + } catch (e) { + _err = e; + } finally { + done(_err); + } + }); }); - changeStream.close(); }); }); @@ -2772,12 +2783,16 @@ describe('Change Streams', function() { }); waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); - }); + triggerResumableError( + changeStream, + () => { + events.push('error'); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { + expect(err).to.not.exist; + }); + }, + 250 + ); }); } }); @@ -2800,7 +2815,7 @@ describe('Change Streams', function() { case 2: // only events after this point are relevant to this test events = []; - triggerResumableError(changeStream, () => events.push('error')); + triggerResumableError(changeStream, () => events.push('error'), 0); break; case 3: expect(events) @@ -2825,3 +2840,95 @@ describe('Change Streams', function() { }); }); }); + +describe('Change Stream Resume Error Tests', function() { + function withChangeStream(callback) { + return function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const db = client.db('test'); + db.createCollection('changeStreamResumeErrorTest', (err, collection) => { + expect(err).to.not.exist; + const changeStream = collection.watch(); + callback(collection, changeStream, () => + changeStream.close(() => collection.drop(() => client.close(done))) + ); + }); + }); + }; + } + it('(events) should continue iterating after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); + }); + }); + }) + }); + + it('(callback) hasNext and next should work after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError( + changeStream, + () => { + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + }); + }); + collection.insertOne({ b: 24 }); + }, + 250 + ); + }); + }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + }); + }); + }) + }); +});