Skip to content

Commit a9d3965

Browse files
authored
fix(ChangeStream): whitelist resumable errors (#2337)
- Changes which errors are considered resumable on change streams, adding support for the new ResumableChangeStreamError label. - Updates ChangeStream prose tests which described startAfter behavior for unsupported server versions. - Fixes use of startAfter/resumeAfter when resuming from an invalidate event. Implement prose tests #17 and #18. NODE-2478
1 parent 922c3ab commit a9d3965

19 files changed

+6080
-744
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ language: node_js
44
branches:
55
only:
66
- master
7-
- next
7+
- 3.6
88

99
before_install:
1010
# we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells

lib/change_stream.js

+34-26
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,9 @@ class ChangeStreamCursor extends Cursor {
287287
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);
288288

289289
if (this.resumeToken) {
290-
result.resumeAfter = this.resumeToken;
290+
const resumeKey =
291+
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
292+
result[resumeKey] = this.resumeToken;
291293
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
292294
result.startAtOperationTime = this.startAtOperationTime;
293295
}
@@ -296,6 +298,26 @@ class ChangeStreamCursor extends Cursor {
296298
return result;
297299
}
298300

301+
cacheResumeToken(resumeToken) {
302+
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
303+
this.resumeToken = this.cursorState.postBatchResumeToken;
304+
} else {
305+
this.resumeToken = resumeToken;
306+
}
307+
this.hasReceived = true;
308+
}
309+
310+
_processBatch(batchName, response) {
311+
const cursor = response.cursor;
312+
if (cursor.postBatchResumeToken) {
313+
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
314+
315+
if (cursor[batchName].length === 0) {
316+
this.resumeToken = cursor.postBatchResumeToken;
317+
}
318+
}
319+
}
320+
299321
_initializeCursor(callback) {
300322
super._initializeCursor((err, result) => {
301323
if (err) {
@@ -314,15 +336,9 @@ class ChangeStreamCursor extends Cursor {
314336
this.startAtOperationTime = response.operationTime;
315337
}
316338

317-
const cursor = response.cursor;
318-
if (cursor.postBatchResumeToken) {
319-
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
320-
321-
if (cursor.firstBatch.length === 0) {
322-
this.resumeToken = cursor.postBatchResumeToken;
323-
}
324-
}
339+
this._processBatch('firstBatch', response);
325340

341+
this.emit('init', result);
326342
this.emit('response');
327343
callback(err, result);
328344
});
@@ -335,15 +351,9 @@ class ChangeStreamCursor extends Cursor {
335351
return;
336352
}
337353

338-
const cursor = response.cursor;
339-
if (cursor.postBatchResumeToken) {
340-
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
341-
342-
if (cursor.nextBatch.length === 0) {
343-
this.resumeToken = cursor.postBatchResumeToken;
344-
}
345-
}
354+
this._processBatch('nextBatch', response);
346355

356+
this.emit('more', response);
347357
this.emit('response');
348358
callback(err, response);
349359
});
@@ -366,6 +376,7 @@ function createChangeStreamCursor(self, options) {
366376

367377
const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
368378
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
379+
369380
const changeStreamCursor = new ChangeStreamCursor(
370381
self.topology,
371382
new AggregateOperation(self.parent, pipeline, options),
@@ -464,9 +475,10 @@ function processNewChange(args) {
464475
const change = args.change;
465476
const callback = args.callback;
466477
const eventEmitter = args.eventEmitter || false;
478+
const cursor = changeStream.cursor;
467479

468-
// If the changeStream is closed, then it should not process a change.
469-
if (changeStream.isClosed()) {
480+
// If the cursor is null, then it should not process a change.
481+
if (cursor == null) {
470482
// We do not error in the eventEmitter case.
471483
if (eventEmitter) {
472484
return;
@@ -478,12 +490,12 @@ function processNewChange(args) {
478490
: changeStream.promiseLibrary.reject(error);
479491
}
480492

481-
const cursor = changeStream.cursor;
482493
const topology = changeStream.topology;
483494
const options = changeStream.cursor.options;
495+
const wireVersion = maxWireVersion(cursor.server);
484496

485497
if (error) {
486-
if (isResumableError(error) && !changeStream.attemptingResume) {
498+
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
487499
changeStream.attemptingResume = true;
488500

489501
// stop listening to all events from old cursor
@@ -549,11 +561,7 @@ function processNewChange(args) {
549561
}
550562

551563
// cache the resume token
552-
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
553-
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
554-
} else {
555-
cursor.resumeToken = change._id;
556-
}
564+
cursor.cacheResumeToken(change._id);
557565

558566
// wipe the startAtOperationTime if there was one so that there won't be a conflict
559567
// between resumeToken and startAtOperationTime if we need to reconnect the cursor

lib/core/cursor.js

-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ const Logger = require('./connection/logger');
44
const retrieveBSON = require('./connection/utils').retrieveBSON;
55
const MongoError = require('./error').MongoError;
66
const MongoNetworkError = require('./error').MongoNetworkError;
7-
const mongoErrorContextSymbol = require('./error').mongoErrorContextSymbol;
87
const collationNotSupported = require('./utils').collationNotSupported;
98
const ReadPreference = require('./topologies/read_preference');
109
const isUnifiedTopology = require('./utils').isUnifiedTopology;
@@ -774,10 +773,6 @@ function nextFunction(self, callback) {
774773
// Execute the next get more
775774
self._getMore(function(err, doc, connection) {
776775
if (err) {
777-
if (err instanceof MongoError) {
778-
err[mongoErrorContextSymbol].isGetMore = true;
779-
}
780-
781776
return handleCallback(callback, err);
782777
}
783778

lib/core/error.js

-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
'use strict';
22

3-
const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol');
4-
53
/**
64
* Creates a new MongoError
75
*
@@ -29,7 +27,6 @@ class MongoError extends Error {
2927
}
3028

3129
this.name = 'MongoError';
32-
this[mongoErrorContextSymbol] = this[mongoErrorContextSymbol] || {};
3330
}
3431

3532
/**
@@ -262,7 +259,6 @@ module.exports = {
262259
MongoTimeoutError,
263260
MongoServerSelectionError,
264261
MongoWriteConcernError,
265-
mongoErrorContextSymbol,
266262
isRetryableError,
267263
isSDAMUnrecoverableError,
268264
isNodeShuttingDownError,

lib/core/index.js

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ module.exports = {
2222
MongoTimeoutError: require('./error').MongoTimeoutError,
2323
MongoServerSelectionError: require('./error').MongoServerSelectionError,
2424
MongoWriteConcernError: require('./error').MongoWriteConcernError,
25-
mongoErrorContextSymbol: require('./error').mongoErrorContextSymbol,
2625
// Core
2726
Connection: require('./connection/connection'),
2827
Server: require('./topologies/server'),

lib/core/utils.js

+13-11
Original file line numberDiff line numberDiff line change
@@ -83,22 +83,24 @@ function retrieveEJSON() {
8383
* @param {(Topology|Server)} topologyOrServer
8484
*/
8585
function maxWireVersion(topologyOrServer) {
86-
if (topologyOrServer.ismaster) {
87-
return topologyOrServer.ismaster.maxWireVersion;
88-
}
86+
if (topologyOrServer) {
87+
if (topologyOrServer.ismaster) {
88+
return topologyOrServer.ismaster.maxWireVersion;
89+
}
8990

90-
if (typeof topologyOrServer.lastIsMaster === 'function') {
91-
const lastIsMaster = topologyOrServer.lastIsMaster();
92-
if (lastIsMaster) {
93-
return lastIsMaster.maxWireVersion;
91+
if (typeof topologyOrServer.lastIsMaster === 'function') {
92+
const lastIsMaster = topologyOrServer.lastIsMaster();
93+
if (lastIsMaster) {
94+
return lastIsMaster.maxWireVersion;
95+
}
9496
}
95-
}
9697

97-
if (topologyOrServer.description) {
98-
return topologyOrServer.description.maxWireVersion;
98+
if (topologyOrServer.description) {
99+
return topologyOrServer.description.maxWireVersion;
100+
}
99101
}
100102

101-
return null;
103+
return 0;
102104
}
103105

104106
/*

lib/error.js

+26-33
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,38 @@
11
'use strict';
22

33
const MongoNetworkError = require('./core').MongoNetworkError;
4-
const mongoErrorContextSymbol = require('./core').mongoErrorContextSymbol;
54

6-
const GET_MORE_NON_RESUMABLE_CODES = new Set([
7-
136, // CappedPositionLost
8-
237, // CursorKilled
9-
11601 // Interrupted
5+
// From spec@https://github.com/mongodb/specifications/blob/f93d78191f3db2898a59013a7ed5650352ef6da8/source/change-streams/change-streams.rst#resumable-error
6+
const GET_MORE_RESUMABLE_CODES = new Set([
7+
6, // HostUnreachable
8+
7, // HostNotFound
9+
89, // NetworkTimeout
10+
91, // ShutdownInProgress
11+
189, // PrimarySteppedDown
12+
262, // ExceededTimeLimit
13+
9001, // SocketException
14+
10107, // NotMaster
15+
11600, // InterruptedAtShutdown
16+
11602, // InterruptedDueToReplStateChange
17+
13435, // NotMasterNoSlaveOk
18+
13436, // NotMasterOrSecondary
19+
63, // StaleShardVersion
20+
150, // StaleEpoch
21+
13388, // StaleConfig
22+
234, // RetryChangeStream
23+
133 // FailedToSatisfyReadPreference
1024
]);
1125

12-
// From spec@https://github.com/mongodb/specifications/blob/7a2e93d85935ee4b1046a8d2ad3514c657dc74fa/source/change-streams/change-streams.rst#resumable-error:
13-
//
14-
// An error is considered resumable if it meets any of the following criteria:
15-
// - any error encountered which is not a server error (e.g. a timeout error or network error)
16-
// - any server error response from a getMore command excluding those containing the error label
17-
// NonRetryableChangeStreamError and those containing the following error codes:
18-
// - Interrupted: 11601
19-
// - CappedPositionLost: 136
20-
// - CursorKilled: 237
21-
//
22-
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.
23-
24-
function isGetMoreError(error) {
25-
if (error[mongoErrorContextSymbol]) {
26-
return error[mongoErrorContextSymbol].isGetMore;
27-
}
28-
}
29-
30-
function isResumableError(error) {
31-
if (!isGetMoreError(error)) {
32-
return false;
33-
}
34-
26+
function isResumableError(error, wireVersion) {
3527
if (error instanceof MongoNetworkError) {
3628
return true;
3729
}
3830

39-
return !(
40-
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
41-
error.hasErrorLabel('NonRetryableChangeStreamError')
42-
);
31+
if (wireVersion >= 9) {
32+
return error.hasErrorLabel('ResumableChangeStreamError');
33+
}
34+
35+
return GET_MORE_RESUMABLE_CODES.has(error.code);
4336
}
4437

45-
module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
38+
module.exports = { GET_MORE_RESUMABLE_CODES, isResumableError };

0 commit comments

Comments
 (0)