Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ChangeStream): hasNext/next should work after resume #2333

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
941c93c
test: add new tests demonstrating failure
emadum Apr 14, 2020
639b422
test: only run on servers that support change streams
emadum Apr 14, 2020
9cb32c6
test: fix tests
emadum Apr 26, 2020
1713f57
refactor: add getCursor to prevent iterating on old cursor during res…
emadum Apr 26, 2020
f106a57
test: extract withChangeStream helper
emadum Apr 26, 2020
b61b3a8
test: skip callback resume error tests
emadum Apr 26, 2020
2f3c0c1
test: consolidate tests
emadum Apr 26, 2020
48eff06
test: remove unused helpers to fix lint
emadum Apr 26, 2020
dc82031
fix: NonRetryableChangeStreamError should be NonResumableChangeStream…
emadum Apr 26, 2020
55d17d2
test: fix onResume in eventEmitter case
emadum Apr 26, 2020
5d0efeb
refactor: move waitForTopology into getCursor
emadum Apr 29, 2020
382158b
cleanup
emadum Apr 29, 2020
771af47
add comments
emadum Apr 29, 2020
a5a34f0
jsdoc cleanup
emadum Apr 29, 2020
92d604c
Merge remote-tracking branch 'origin/3.5' into NODE-2548/changestream…
emadum Apr 30, 2020
3a87128
Merge remote-tracking branch 'origin/3.5' into NODE-2548/changestream…
emadum Apr 30, 2020
8c84bf5
test: test for both old versions of non-resumable label to be safe
emadum Apr 30, 2020
a5224c6
refactor: use maybePromise for ChangeStream.close
emadum Apr 30, 2020
a5ba416
test: fix skipped mid-close tests
emadum Apr 30, 2020
faf2a5a
test: skip change stream tests running on mock single topology
emadum Apr 30, 2020
5dd46db
test: fix broken tests
emadum May 1, 2020
37f6cb9
test: unskip test
emadum May 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 53 additions & 64 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
hasNext(callback) {
return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
return maybePromise(this.parent, callback, cb =>
getCursor(this, (err, cursor) => {
if (err) return cb(err);
cursor.hasNext(cb);
})
);
}

/**
Expand All @@ -139,14 +144,17 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
next(callback) {
return maybePromise(this.parent, callback, cb => {
if (this.isClosed()) {
return cb(new Error('Change Stream is not open.'));
}
this.cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
});
});
return maybePromise(this.parent, callback, cb =>
getCursor(this, (err, cursor) => {
if (err) return cb(err);
if (this.isClosed()) {
return cb(new Error('Change Stream is not open.'));
}
cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
});
})
);
}

/**
Expand Down Expand Up @@ -468,84 +476,56 @@ function processNewChange(args) {
// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
}

const error = new MongoError('ChangeStream is closed');
return typeof callback === 'function'
? callback(error, null)
: changeStream.promiseLibrary.reject(error);
if (eventEmitter) return;
return callback(new MongoError('ChangeStream is closed'));
}

const cursor = changeStream.cursor;
const topology = changeStream.topology;
const options = changeStream.cursor.options;

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;
if (isResumableError(error) && changeStream.cursor) {
changeStream.cursor = null;
const onResume = [];
changeStream.onResume = cb => onResume.push(cb);

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event =>
changeStream.cursor.removeAllListeners(event)
);
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));

// close internal cursor, ignore errors
changeStream.cursor.close();
cursor.close();

// attempt recreating the cursor
if (eventEmitter) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
return waitForTopologyConnected(
changeStream.topology,
{ readPreference: cursor.options.readPreference },
err => {
if (err) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
if (eventEmitter) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
}
onResume.forEach(cb => cb(err));
return callback(err);
}
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
});

return;
}

if (callback) {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return callback(err, null);

changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
changeStream.next(callback);
});

return;
}

return new Promise((resolve, reject) => {
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) return reject(err);
resolve();
});
})
.then(
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
)
.then(() => changeStream.next());
onResume.forEach(cb => cb());
if (!eventEmitter) changeStream.next(callback);
}
);
}

if (eventEmitter) return changeStream.emit('error', error);
if (typeof callback === 'function') return callback(error, null);
return changeStream.promiseLibrary.reject(error);
return callback(error, null);
}

changeStream.attemptingResume = false;

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (typeof callback === 'function') return callback(noResumeTokenError, null);
return changeStream.promiseLibrary.reject(noResumeTokenError);
return callback(noResumeTokenError, null);
}

// cache the resume token
Expand All @@ -561,8 +541,17 @@ function processNewChange(args) {

// Return the change
if (eventEmitter) return changeStream.emit('change', change);
if (typeof callback === 'function') return callback(error, change);
return changeStream.promiseLibrary.resolve(change);
return callback(error, change);
}

function getCursor(changeStream, callback) {
if (!changeStream.cursor) {
return changeStream.onResume(err => {
if (err) return callback(err);
getCursor(changeStream, callback);
});
}
return callback(null, changeStream.cursor);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function isResumableError(error) {

return !(
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.hasErrorLabel('NonRetryableChangeStreamError')
error.hasErrorLabel('NonResumableChangeStreamError')
);
}

Expand Down
127 changes: 124 additions & 3 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var assert = require('assert');
var Transform = require('stream').Transform;
const MongoError = require('../../lib/core').MongoError;
var MongoNetworkError = require('../../lib/core').MongoNetworkError;
var mongoErrorContextSymbol = require('../../lib/core').mongoErrorContextSymbol;
var setupDatabase = require('./shared').setupDatabase;
var delay = require('./shared').delay;
var co = require('co');
Expand All @@ -13,6 +14,38 @@ const sinon = require('sinon');

chai.use(require('chai-subset'));

/**
* Triggers a fake resumable error on a change stream
*
* @param {ChangeStream} changeStream
* @param {Function} onCursorClosed callback when cursor closed due this error
*/
function triggerResumableError(changeStream, onCursorClosed) {
const closeCursor = changeStream.cursor.close.bind(changeStream.cursor);
changeStream.cursor.close = callback => {
closeCursor(err => {
callback && callback(err);
onCursorClosed();
});
};
const fakeResumableError = new MongoNetworkError('fake error');
fakeResumableError[mongoErrorContextSymbol] = { isGetMore: true };
// delay error slightly to better simulate real conditions
setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), 250);
}

/**
* Waits for a change stream to start
*
* @param {ChangeStream} changeStream
* @param {Function} callback
*/
function waitForStarted(changeStream, callback) {
changeStream.cursor.once('response', () => {
callback();
});
}

// Define the pipeline processing changes
var pipeline = [
{ $addFields: { addedField: 'This is a field added using $addFields' } },
Expand Down Expand Up @@ -1792,7 +1825,7 @@ describe('Change Streams', function() {
}
});

it('should not resume when error includes error label NonRetryableChangeStreamError', function() {
it('should not resume when error includes error label NonResumableChangeStreamError', function() {
let server;
let client;
let changeStream;
Expand Down Expand Up @@ -1840,7 +1873,7 @@ describe('Change Streams', function() {
getMoreCount += 1;
request.reply({
ok: 0,
errorLabels: ['NonRetryableChangeStreamError']
errorLabels: ['NonResumableChangeStreamError']
});
} else {
getMoreCount += 1;
Expand Down Expand Up @@ -1883,7 +1916,7 @@ describe('Change Streams', function() {
() => Promise.reject('Expected changeStream to not resume'),
err => {
expect(err).to.be.an.instanceOf(MongoError);
expect(err.hasErrorLabel('NonRetryableChangeStreamError')).to.be.true;
expect(err.hasErrorLabel('NonResumableChangeStreamError')).to.be.true;
expect(aggregateCount).to.equal(1);
expect(getMoreCount).to.equal(1);
}
Expand Down Expand Up @@ -2827,3 +2860,91 @@ describe('Change Streams', function() {
});
});
});

describe('Change Stream Resume Error Tests', function() {
function withChangeStream(callback) {
return function(done) {
const configuration = this.configuration;
const client = configuration.newClient();
client.connect(err => {
expect(err).to.not.exist;
const db = client.db('test');
db.createCollection('changeStreamResumeErrorTest', (err, collection) => {
expect(err).to.not.exist;
const changeStream = collection.watch();
callback(collection, changeStream, () =>
changeStream.close(() => collection.drop(() => client.close(done)))
);
});
});
};
}
it('(events) should continue iterating after a resumable error', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: withChangeStream((collection, changeStream, done) => {
const docs = [];
changeStream.on('change', change => {
expect(change).to.exist;
docs.push(change);
if (docs.length === 2) {
expect(docs[0]).to.containSubset({
operationType: 'insert',
fullDocument: { a: 42 }
});
expect(docs[1]).to.containSubset({
operationType: 'insert',
fullDocument: { b: 24 }
});
done();
}
});
waitForStarted(changeStream, () => {
collection.insertOne({ a: 42 }, err => {
expect(err).to.not.exist;
triggerResumableError(changeStream, () => {
collection.insertOne({ b: 24 }, err => {
expect(err).to.not.exist;
});
});
});
});
})
});

it('(callback) hasNext and next should work after a resumable error', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: withChangeStream((collection, changeStream, done) => {
waitForStarted(changeStream, () => {
collection.insertOne({ a: 42 }, err => {
expect(err).to.not.exist;
triggerResumableError(changeStream, () => {
changeStream.hasNext((err1, hasNext) => {
expect(err1).to.not.exist;
expect(hasNext).to.be.true;
changeStream.next((err, change) => {
expect(err).to.not.exist;
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { b: 24 }
});
done();
});
});
collection.insertOne({ b: 24 });
});
});
});
changeStream.hasNext((err, hasNext) => {
expect(err).to.not.exist;
expect(hasNext).to.be.true;
changeStream.next((err, change) => {
expect(err).to.not.exist;
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { a: 42 }
});
});
});
})
});
});