Skip to content

Commit 306b5b3

Browse files
authored
fix(ChangeStream): handle null changes
NODE-2626
1 parent 99681e1 commit 306b5b3

File tree

3 files changed

+85
-41
lines changed

3 files changed

+85
-41
lines changed

lib/change_stream.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ class ChangeStreamCursor extends Cursor {
326326

327327
_initializeCursor(callback) {
328328
super._initializeCursor((err, result) => {
329-
if (err) {
330-
callback(err);
329+
if (err || result == null) {
330+
callback(err, result);
331331
return;
332332
}
333333

@@ -483,6 +483,11 @@ function waitForTopologyConnected(topology, options, callback) {
483483
function processNewChange(changeStream, change, callback) {
484484
const cursor = changeStream.cursor;
485485

486+
// a null change means the cursor has been notified, implicitly closing the change stream
487+
if (change == null) {
488+
changeStream.closed = true;
489+
}
490+
486491
if (changeStream.closed) {
487492
if (callback) callback(new MongoError('ChangeStream is closed'));
488493
return;

lib/core/cursor.js

+30-39
Original file line numberDiff line numberDiff line change
@@ -464,50 +464,41 @@ class CoreCursor extends Readable {
464464
}
465465

466466
const result = r.message;
467-
if (result.queryFailure) {
468-
return done(new MongoError(result.documents[0]), null);
469-
}
470467

471-
// Check if we have a command cursor
472-
if (
473-
Array.isArray(result.documents) &&
474-
result.documents.length === 1 &&
475-
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
476-
(typeof result.documents[0].cursor !== 'string' ||
477-
result.documents[0]['$err'] ||
478-
result.documents[0]['errmsg'] ||
479-
Array.isArray(result.documents[0].result))
480-
) {
481-
// We have an error document, return the error
482-
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
483-
return done(new MongoError(result.documents[0]), null);
468+
if (Array.isArray(result.documents) && result.documents.length === 1) {
469+
const document = result.documents[0];
470+
471+
if (result.queryFailure) {
472+
return done(new MongoError(document), null);
484473
}
485474

486-
// We have a cursor document
487-
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
488-
const id = result.documents[0].cursor.id;
489-
// If we have a namespace change set the new namespace for getmores
490-
if (result.documents[0].cursor.ns) {
491-
cursor.ns = result.documents[0].cursor.ns;
475+
// Check if we have a command cursor
476+
if (!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) {
477+
// We have an error document, return the error
478+
if (document.$err || document.errmsg) {
479+
return done(new MongoError(document), null);
492480
}
493-
// Promote id to long if needed
494-
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
495-
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
496-
cursor.cursorState.operationTime = result.documents[0].operationTime;
497-
498-
// If we have a firstBatch set it
499-
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
500-
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
501-
}
502-
503-
// Return after processing command cursor
504-
return done(null, result);
505-
}
506481

507-
if (Array.isArray(result.documents[0].result)) {
508-
cursor.cursorState.documents = result.documents[0].result;
509-
cursor.cursorState.cursorId = Long.ZERO;
510-
return done(null, result);
482+
// We have a cursor document
483+
if (document.cursor != null && typeof document.cursor !== 'string') {
484+
const id = document.cursor.id;
485+
// If we have a namespace change set the new namespace for getmores
486+
if (document.cursor.ns) {
487+
cursor.ns = document.cursor.ns;
488+
}
489+
// Promote id to long if needed
490+
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
491+
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
492+
cursor.cursorState.operationTime = document.operationTime;
493+
494+
// If we have a firstBatch set it
495+
if (Array.isArray(document.cursor.firstBatch)) {
496+
cursor.cursorState.documents = document.cursor.firstBatch; //.reverse();
497+
}
498+
499+
// Return after processing command cursor
500+
return done(null, result);
501+
}
511502
}
512503
}
513504

test/functional/change_stream.test.js

+48
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const expect = chai.expect;
1313
const sinon = require('sinon');
1414
const fs = require('fs');
1515
const crypto = require('crypto');
16+
const BSON = require('bson');
17+
const Long = BSON.Long;
1618

1719
chai.use(require('chai-subset'));
1820

@@ -2830,3 +2832,49 @@ describe('Change Stream Resume Error Tests', function() {
28302832
})
28312833
});
28322834
});
2835+
context('NODE-2626', function() {
2836+
let mockServer;
2837+
afterEach(() => mock.cleanup());
2838+
beforeEach(() => mock.createServer().then(server => (mockServer = server)));
2839+
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function(done) {
2840+
mockServer.setMessageHandler(req => {
2841+
const doc = req.document;
2842+
if (doc.ismaster) {
2843+
return req.reply(mock.DEFAULT_ISMASTER_36);
2844+
}
2845+
if (doc.aggregate) {
2846+
return req.reply({
2847+
ok: 1,
2848+
cursor: {
2849+
id: Long.ZERO,
2850+
firstBatch: []
2851+
}
2852+
});
2853+
}
2854+
if (doc.getMore) {
2855+
return req.reply({
2856+
ok: 1,
2857+
cursor: {
2858+
id: new Long(1407, 1407),
2859+
nextBatch: []
2860+
}
2861+
});
2862+
}
2863+
req.reply({ ok: 1 });
2864+
});
2865+
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
2866+
useUnifiedTopology: true
2867+
});
2868+
client.connect(err => {
2869+
expect(err).to.not.exist;
2870+
const collection = client.db('cs').collection('test');
2871+
const changeStream = collection.watch();
2872+
changeStream.next((err, doc) => {
2873+
expect(err).to.exist;
2874+
expect(doc).to.not.exist;
2875+
expect(err.message).to.equal('ChangeStream is closed');
2876+
changeStream.close(() => client.close(done));
2877+
});
2878+
});
2879+
});
2880+
});

0 commit comments

Comments
 (0)