From 36bc1fd592da199b3e2b8b0fea1d1a0576ae0ae4 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 10 Jul 2019 21:18:49 -0400 Subject: [PATCH] feat(execute-operation): allow execution with server selection This is the first step to moving all server selection into the operation executor. An aspect on the operation called `EXECUTE_WITH_SELECTION` indicates that the operation will use a variant of the `execute` method which accepts a selected `Server` instance. NOTE: the retry logic is current read specific. --- lib/operations/connect.js | 1 + lib/operations/execute_operation.js | 54 +++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/lib/operations/connect.js b/lib/operations/connect.js index 2845149aa3f..fe600ff30ee 100644 --- a/lib/operations/connect.js +++ b/lib/operations/connect.js @@ -133,6 +133,7 @@ const validOptionNames = [ 'minSize', 'monitorCommands', 'retryWrites', + 'retryReads', 'useNewUrlParser', 'useUnifiedTopology', 'serverSelectionTimeoutMS', diff --git a/lib/operations/execute_operation.js b/lib/operations/execute_operation.js index b57406f7dea..f6e7e45d397 100644 --- a/lib/operations/execute_operation.js +++ b/lib/operations/execute_operation.js @@ -3,6 +3,8 @@ const MongoError = require('../core').MongoError; const Aspect = require('./operation').Aspect; const OperationBase = require('./operation').OperationBase; +const ReadPreference = require('../core').ReadPreference; +const isRetryableError = require('../core/error').isRetryableError; /** * Executes the given operation with provided arguments. @@ -65,7 +67,11 @@ function executeOperation(topology, operation, callback) { ); try { - return operation.execute(handler); + if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { + return executeWithServerSelection(topology, operation, handler); + } else { + return operation.execute(handler); + } } catch (e) { handler(e); throw e; @@ -76,11 +82,55 @@ function executeOperation(topology, operation, callback) { const handler = makeExecuteCallback(resolve, reject); try { - return operation.execute(handler); + if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { + return executeWithServerSelection(topology, operation, handler); + } else { + return operation.execute(handler); + } } catch (e) { handler(e); } }); } +function executeWithServerSelection(topology, operation, callback) { + const readPreference = operation.readPreference || ReadPreference.primary; + const shouldRetryReads = topology.s.options.retryReads !== false; + + function callbackWithRetry(err, result) { + if (err == null) { + return callback(null, result); + } + + if (!isRetryableError(err)) { + return callback(err); + } + + // select a new server, and attempt to retry the operation + topology.selectServer(readPreference, (err, server) => { + if (err) { + callback(err, null); + return; + } + + operation.execute(server, callback); + }); + } + + // select a server, and execute the operation against it + topology.selectServer(readPreference, (err, server) => { + if (err) { + callback(err, null); + return; + } + + if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) { + operation.execute(server, callbackWithRetry); + return; + } + + operation.execute(server, callback); + }); +} + module.exports = executeOperation;