Skip to content

Commit 0394f9d

Browse files
committed
fix: always clear cancelled wait queue members during processing
Each time the wait queue in the connection pool is processed it is currently gated by having available connections. This can result in a memory leak where many wait queue members are cancelled, but the pool is unable to clear them out before all connections are used for new operations. NODE-2413
1 parent 58b4f94 commit 0394f9d

File tree

2 files changed

+49
-4
lines changed

2 files changed

+49
-4
lines changed

lib/cmap/connection_pool.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ class ConnectionPool extends EventEmitter {
198198
return this[kConnections].length;
199199
}
200200

201+
get waitQueueSize() {
202+
return this[kWaitQueue].length;
203+
}
204+
201205
/**
202206
* Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
203207
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
@@ -295,7 +299,7 @@ class ConnectionPool extends EventEmitter {
295299
this[kCancellationToken].emit('cancel');
296300

297301
// drain the wait queue
298-
while (this[kWaitQueue].length) {
302+
while (this.waitQueueSize) {
299303
const waitQueueMember = this[kWaitQueue].pop();
300304
clearTimeout(waitQueueMember.timer);
301305
if (!waitQueueMember[kCancelled]) {
@@ -449,13 +453,17 @@ function processWaitQueue(pool) {
449453
return;
450454
}
451455

452-
while (pool[kWaitQueue].length && pool.availableConnectionCount) {
456+
while (pool.waitQueueSize) {
453457
const waitQueueMember = pool[kWaitQueue].peekFront();
454458
if (waitQueueMember[kCancelled]) {
455459
pool[kWaitQueue].shift();
456460
continue;
457461
}
458462

463+
if (!pool.availableConnectionCount) {
464+
break;
465+
}
466+
459467
const connection = pool[kConnections].shift();
460468
const isStale = connectionIsStale(pool, connection);
461469
const isIdle = connectionIsIdle(pool, connection);
@@ -472,7 +480,7 @@ function processWaitQueue(pool) {
472480
}
473481

474482
const maxPoolSize = pool.options.maxPoolSize;
475-
if (pool[kWaitQueue].length && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
483+
if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
476484
createConnection(pool, (err, connection) => {
477485
const waitQueueMember = pool[kWaitQueue].shift();
478486
if (waitQueueMember == null) {

test/unit/cmap/connection_pool.test.js

+38-1
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ require('util.promisify/shim')();
44
const util = require('util');
55
const loadSpecTests = require('../../spec').loadSpecTests;
66
const ConnectionPool = require('../../../lib/cmap/connection_pool').ConnectionPool;
7+
const WaitQueueTimeoutError = require('../../../lib/cmap/errors').WaitQueueTimeoutError;
78
const EventEmitter = require('events').EventEmitter;
89
const mock = require('mongodb-mock-server');
910
const BSON = require('bson');
1011
const cmapEvents = require('../../../lib/cmap/events');
11-
12+
const sinon = require('sinon');
1213
const chai = require('chai');
1314
chai.use(require('../../functional/spec-runner/matcher').default);
1415
const expect = chai.expect;
@@ -117,6 +118,42 @@ describe('Connection Pool', function() {
117118
);
118119
});
119120

121+
it('should clear timed out wait queue members if no connections are available', function(done) {
122+
server.setMessageHandler(request => {
123+
const doc = request.document;
124+
if (doc.ismaster) {
125+
request.reply(mock.DEFAULT_ISMASTER_36);
126+
}
127+
});
128+
129+
const pool = new ConnectionPool(
130+
Object.assign({ bson: new BSON(), maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address())
131+
);
132+
133+
pool.checkOut((err, conn) => {
134+
expect(err).to.not.exist;
135+
expect(conn).to.exist;
136+
137+
pool.checkOut(err => {
138+
expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError);
139+
140+
// We can only process the wait queue with `checkIn` and `checkOut`, so we
141+
// force the pool here to think there are no available connections, even though
142+
// we are checking the connection back in. This simulates a slow leak where
143+
// incoming requests outpace the ability of the queue to fully process cancelled
144+
// wait queue members
145+
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
146+
pool.checkIn(conn);
147+
148+
expect(pool)
149+
.property('waitQueueSize')
150+
.to.equal(0);
151+
152+
done();
153+
});
154+
});
155+
});
156+
120157
describe('withConnection', function() {
121158
it('should manage a connection for a successful operation', function(done) {
122159
server.setMessageHandler(request => {

0 commit comments

Comments
 (0)