Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ChangeStream): whitelist resumable errors #2337

Merged
merged 4 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ language: node_js
branches:
only:
- master
- next
- 3.6

before_install:
# we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells
Expand Down
60 changes: 34 additions & 26 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ class ChangeStreamCursor extends Cursor {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);

if (this.resumeToken) {
result.resumeAfter = this.resumeToken;
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand All @@ -296,6 +298,26 @@ class ChangeStreamCursor extends Cursor {
return result;
}

cacheResumeToken(resumeToken) {
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
this.resumeToken = this.cursorState.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(batchName, response) {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor[batchName].length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
Expand All @@ -314,15 +336,9 @@ class ChangeStreamCursor extends Cursor {
this.startAtOperationTime = response.operationTime;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.firstBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('firstBatch', response);

this.emit('init', result);
this.emit('response');
callback(err, result);
});
Expand All @@ -335,15 +351,9 @@ class ChangeStreamCursor extends Cursor {
return;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.nextBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
callback(err, response);
});
Expand All @@ -366,6 +376,7 @@ function createChangeStreamCursor(self, options) {

const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand Down Expand Up @@ -464,9 +475,10 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
const cursor = changeStream.cursor;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// If the cursor is null, then it should not process a change.
if (cursor == null) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
Expand All @@ -478,12 +490,12 @@ function processNewChange(args) {
: changeStream.promiseLibrary.reject(error);
}

const cursor = changeStream.cursor;
const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
Expand Down Expand Up @@ -549,11 +561,7 @@ function processNewChange(args) {
}

// cache the resume token
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
} else {
cursor.resumeToken = change._id;
}
cursor.cacheResumeToken(change._id);

// wipe the startAtOperationTime if there was one so that there won't be a conflict
// between resumeToken and startAtOperationTime if we need to reconnect the cursor
Expand Down
5 changes: 0 additions & 5 deletions lib/core/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const Logger = require('./connection/logger');
const retrieveBSON = require('./connection/utils').retrieveBSON;
const MongoError = require('./error').MongoError;
const MongoNetworkError = require('./error').MongoNetworkError;
const mongoErrorContextSymbol = require('./error').mongoErrorContextSymbol;
const collationNotSupported = require('./utils').collationNotSupported;
const ReadPreference = require('./topologies/read_preference');
const isUnifiedTopology = require('./utils').isUnifiedTopology;
Expand Down Expand Up @@ -774,10 +773,6 @@ function nextFunction(self, callback) {
// Execute the next get more
self._getMore(function(err, doc, connection) {
if (err) {
if (err instanceof MongoError) {
err[mongoErrorContextSymbol].isGetMore = true;
}

return handleCallback(callback, err);
}

Expand Down
4 changes: 0 additions & 4 deletions lib/core/error.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict';

const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol');

/**
* Creates a new MongoError
*
Expand Down Expand Up @@ -29,7 +27,6 @@ class MongoError extends Error {
}

this.name = 'MongoError';
this[mongoErrorContextSymbol] = this[mongoErrorContextSymbol] || {};
}

/**
Expand Down Expand Up @@ -262,7 +259,6 @@ module.exports = {
MongoTimeoutError,
MongoServerSelectionError,
MongoWriteConcernError,
mongoErrorContextSymbol,
isRetryableError,
isSDAMUnrecoverableError,
isNodeShuttingDownError,
Expand Down
1 change: 0 additions & 1 deletion lib/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module.exports = {
MongoTimeoutError: require('./error').MongoTimeoutError,
MongoServerSelectionError: require('./error').MongoServerSelectionError,
MongoWriteConcernError: require('./error').MongoWriteConcernError,
mongoErrorContextSymbol: require('./error').mongoErrorContextSymbol,
// Core
Connection: require('./connection/connection'),
Server: require('./topologies/server'),
Expand Down
24 changes: 13 additions & 11 deletions lib/core/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,24 @@ function retrieveEJSON() {
* @param {(Topology|Server)} topologyOrServer
*/
function maxWireVersion(topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}
if (topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}

if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
}
}
}

if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
}
}

return null;
return 0;
}

/*
Expand Down
59 changes: 26 additions & 33 deletions lib/error.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
'use strict';

const MongoNetworkError = require('./core').MongoNetworkError;
const mongoErrorContextSymbol = require('./core').mongoErrorContextSymbol;

const GET_MORE_NON_RESUMABLE_CODES = new Set([
136, // CappedPositionLost
237, // CursorKilled
11601 // Interrupted
// From spec@https://github.com/mongodb/specifications/blob/f93d78191f3db2898a59013a7ed5650352ef6da8/source/change-streams/change-streams.rst#resumable-error
const GET_MORE_RESUMABLE_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
262, // ExceededTimeLimit
9001, // SocketException
10107, // NotMaster
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
63, // StaleShardVersion
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133 // FailedToSatisfyReadPreference
]);

// From spec@https://github.com/mongodb/specifications/blob/7a2e93d85935ee4b1046a8d2ad3514c657dc74fa/source/change-streams/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the error label
// NonRetryableChangeStreamError and those containing the following error codes:
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
if (error[mongoErrorContextSymbol]) {
return error[mongoErrorContextSymbol].isGetMore;
}
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

function isResumableError(error, wireVersion) {
if (error instanceof MongoNetworkError) {
return true;
}

return !(
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.hasErrorLabel('NonRetryableChangeStreamError')
);
if (wireVersion >= 9) {
return error.hasErrorLabel('ResumableChangeStreamError');
}

return GET_MORE_RESUMABLE_CODES.has(error.code);
}

module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
module.exports = { GET_MORE_RESUMABLE_CODES, isResumableError };
Loading