You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
5.3 KiB

'use strict';
const maybePromise = require('../utils').maybePromise;
const MongoError = require('../core/error').MongoError;
const Aspect = require('./operation').Aspect;
const OperationBase = require('./operation').OperationBase;
const ReadPreference = require('../core/topologies/read_preference');
const isRetryableError = require('../core/error').isRetryableError;
const maxWireVersion = require('../core/utils').maxWireVersion;
const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
/**
* Executes the given operation with provided arguments.
*
* This method reduces large amounts of duplication in the entire codebase by providing
* a single point for determining whether callbacks or promises should be used. Additionally
* it allows for a single point of entry to provide features such as implicit sessions, which
* are required by the Driver Sessions specification in the event that a ClientSession is
* not provided
*
* @param {object} topology The topology to execute this operation on
* @param {Operation} operation The operation to execute
* @param {function} callback The command result callback
*/
function executeOperation(topology, operation, cb) {
if (topology == null) {
throw new TypeError('This method requires a valid topology instance');
}
if (!(operation instanceof OperationBase)) {
throw new TypeError('This method requires a valid operation instance');
}
return maybePromise(topology, cb, callback => {
if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
// Recursive call to executeOperation after a server selection
return selectServerForSessionSupport(topology, operation, callback);
}
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
return callback(new MongoError('Use of expired sessions is not permitted'));
}
} else if (operation.session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoError('Current topology does not support sessions'));
}
function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}
callback(err, result);
}
try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (error) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}
callback(error);
}
});
}
function supportsRetryableReads(server) {
return maxWireVersion(server) >= 6;
}
function executeWithServerSelection(topology, operation, callback) {
const readPreference = operation.readPreference || ReadPreference.primary;
const inTransaction = operation.session && operation.session.inTransaction();
if (inTransaction && !readPreference.equals(ReadPreference.primary)) {
callback(
new MongoError(
`Read preference in a transaction must be primary, not: ${readPreference.mode}`
)
);
return;
}
const serverSelectionOptions = {
readPreference,
session: operation.session
};
function callbackWithRetry(err, result) {
if (err == null) {
return callback(null, result);
}
if (!isRetryableError(err)) {
return callback(err);
}
// select a new server, and attempt to retry the operation
topology.selectServer(serverSelectionOptions, (err, server) => {
if (err || !supportsRetryableReads(server)) {
callback(err, null);
return;
}
operation.execute(server, callback);
});
}
// select a server, and execute the operation against it
topology.selectServer(serverSelectionOptions, (err, server) => {
if (err) {
callback(err, null);
return;
}
const shouldRetryReads =
topology.s.options.retryReads !== false &&
operation.session &&
!inTransaction &&
supportsRetryableReads(server) &&
operation.canRetryRead;
if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) {
operation.execute(server, callbackWithRetry);
return;
}
operation.execute(server, callback);
});
}
// The Unified Topology runs serverSelection before executing every operation
// Session support is determined by the result of a monitoring check triggered by this selection
function selectServerForSessionSupport(topology, operation, callback) {
topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
return callback(err);
}
executeOperation(topology, operation, callback);
});
}
module.exports = executeOperation;