diff --git a/lib/operations/connect.js b/lib/operations/connect.js index 2845149aa3f..fe600ff30ee 100644 --- a/lib/operations/connect.js +++ b/lib/operations/connect.js @@ -133,6 +133,7 @@ const validOptionNames = [ 'minSize', 'monitorCommands', 'retryWrites', + 'retryReads', 'useNewUrlParser', 'useUnifiedTopology', 'serverSelectionTimeoutMS', diff --git a/lib/operations/execute_operation.js b/lib/operations/execute_operation.js index b57406f7dea..f6e7e45d397 100644 --- a/lib/operations/execute_operation.js +++ b/lib/operations/execute_operation.js @@ -3,6 +3,8 @@ const MongoError = require('../core').MongoError; const Aspect = require('./operation').Aspect; const OperationBase = require('./operation').OperationBase; +const ReadPreference = require('../core').ReadPreference; +const isRetryableError = require('../core/error').isRetryableError; /** * Executes the given operation with provided arguments. @@ -65,7 +67,11 @@ function executeOperation(topology, operation, callback) { ); try { - return operation.execute(handler); + if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { + return executeWithServerSelection(topology, operation, handler); + } else { + return operation.execute(handler); + } } catch (e) { handler(e); throw e; @@ -76,11 +82,55 @@ function executeOperation(topology, operation, callback) { const handler = makeExecuteCallback(resolve, reject); try { - return operation.execute(handler); + if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { + return executeWithServerSelection(topology, operation, handler); + } else { + return operation.execute(handler); + } } catch (e) { handler(e); } }); } +function executeWithServerSelection(topology, operation, callback) { + const readPreference = operation.readPreference || ReadPreference.primary; + const shouldRetryReads = topology.s.options.retryReads !== false; + + function callbackWithRetry(err, result) { + if (err == null) { + return callback(null, result); + } + + if (!isRetryableError(err)) { + return callback(err); + } + + // select a new server, and attempt to retry the operation + topology.selectServer(readPreference, (err, server) => { + if (err) { + callback(err, null); + return; + } + + operation.execute(server, callback); + }); + } + + // select a server, and execute the operation against it + topology.selectServer(readPreference, (err, server) => { + if (err) { + callback(err, null); + return; + } + + if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) { + operation.execute(server, callbackWithRetry); + return; + } + + operation.execute(server, callback); + }); +} + module.exports = executeOperation;