summaryrefslogtreecommitdiffstats
path: root/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js')
-rw-r--r--common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js1403
1 files changed, 1403 insertions, 0 deletions
diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js
new file mode 100644
index 0000000..0082da8
--- /dev/null
+++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js
@@ -0,0 +1,1403 @@
+"use strict"
+
+var inherits = require('util').inherits,
+ f = require('util').format,
+ EventEmitter = require('events').EventEmitter,
+ ReadPreference = require('./read_preference'),
+ BasicCursor = require('../cursor'),
+ retrieveBSON = require('../connection/utils').retrieveBSON,
+ Logger = require('../connection/logger'),
+ MongoError = require('../error'),
+ Server = require('./server'),
+ ReplSetState = require('./replset_state'),
+ assign = require('./shared').assign,
+ clone = require('./shared').clone,
+ createClientInfo = require('./shared').createClientInfo;
+
+var MongoCR = require('../auth/mongocr')
+ , X509 = require('../auth/x509')
+ , Plain = require('../auth/plain')
+ , GSSAPI = require('../auth/gssapi')
+ , SSPI = require('../auth/sspi')
+ , ScramSHA1 = require('../auth/scram');
+
+var BSON = retrieveBSON();
+
+//
+// States
+var DISCONNECTED = 'disconnected';
+var CONNECTING = 'connecting';
+var CONNECTED = 'connected';
+var DESTROYED = 'destroyed';
+
+function stateTransition(self, newState) {
+ var legalTransitions = {
+ 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
+ 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
+ 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
+ 'destroyed': [DESTROYED]
+ }
+
+ // Get current state
+ var legalStates = legalTransitions[self.state];
+ if(legalStates && legalStates.indexOf(newState) != -1) {
+ self.state = newState;
+ } else {
+ self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
+ , self.id, self.state, newState, legalStates));
+ }
+}
+
+//
+// ReplSet instance id
+var id = 1;
+var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
+
+/**
+ * Creates a new Replset instance
+ * @class
+ * @param {array} seedlist A list of seeds for the replicaset
+ * @param {boolean} options.setName The Replicaset set name
+ * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
+ * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
+ * @param {boolean} [options.emitError=false] Server will emit errors events
+ * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
+ * @param {number} [options.size=5] Server connection pool size
+ * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
+ * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
+ * @param {boolean} [options.noDelay=true] TCP Connection no delay
+ * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
+ * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
+ * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
+ * @param {boolean} [options.ssl=false] Use SSL for connection
+ * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
+ * @param {Buffer} [options.ca] SSL Certificate store binary buffer
+ * @param {Buffer} [options.cert] SSL Certificate binary buffer
+ * @param {Buffer} [options.key] SSL Key file binary buffer
+ * @param {string} [options.passphrase] SSL Certificate pass phrase
+ * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
+ * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
+ * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
+ * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
+ * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
+ * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
+ * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
+ * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
+ * @return {ReplSet} A cursor instance
+ * @fires ReplSet#connect
+ * @fires ReplSet#ha
+ * @fires ReplSet#joined
+ * @fires ReplSet#left
+ * @fires ReplSet#failed
+ * @fires ReplSet#fullsetup
+ * @fires ReplSet#all
+ * @fires ReplSet#error
+ * @fires ReplSet#serverHeartbeatStarted
+ * @fires ReplSet#serverHeartbeatSucceeded
+ * @fires ReplSet#serverHeartbeatFailed
+ * @fires ReplSet#topologyOpening
+ * @fires ReplSet#topologyClosed
+ * @fires ReplSet#topologyDescriptionChanged
+ * @property {string} type the topology type.
+ * @property {string} parserType the parser type used (c++ or js).
+ */
+var ReplSet = function(seedlist, options) {
+ var self = this;
+ options = options || {};
+
+ // Validate seedlist
+ if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
+ // Validate list
+ if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
+ // Validate entries
+ seedlist.forEach(function(e) {
+ if(typeof e.host != 'string' || typeof e.port != 'number')
+ throw new MongoError("seedlist entry must contain a host and port");
+ });
+
+ // Add event listener
+ EventEmitter.call(this);
+
+ // Get replSet Id
+ this.id = id++;
+
+ // Get the localThresholdMS
+ var localThresholdMS = options.localThresholdMS || 15;
+ // Backward compatibility
+ if(options.acceptableLatency) localThresholdMS = options.acceptableLatency;
+
+ // Create a logger
+ var logger = Logger('ReplSet', options);
+
+ // Internal state
+ this.s = {
+ options: assign({}, options),
+ // BSON instance
+ bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
+ BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
+ BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
+ // Factory overrides
+ Cursor: options.cursorFactory || BasicCursor,
+ // Logger instance
+ logger: logger,
+ // Seedlist
+ seedlist: seedlist,
+ // Replicaset state
+ replicaSetState: new ReplSetState({
+ id: this.id, setName: options.setName,
+ acceptableLatency: localThresholdMS,
+ heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
+ logger: logger
+ }),
+ // Current servers we are connecting to
+ connectingServers: [],
+ // Ha interval
+ haInterval: options.haInterval ? options.haInterval : 10000,
+ // Minimum heartbeat frequency used if we detect a server close
+ minHeartbeatFrequencyMS: 500,
+ // Disconnect handler
+ disconnectHandler: options.disconnectHandler,
+ // Server selection index
+ index: 0,
+ // Connect function options passed in
+ connectOptions: {},
+ // Are we running in debug mode
+ debug: typeof options.debug == 'boolean' ? options.debug : false,
+ // Client info
+ clientInfo: createClientInfo(options)
+ }
+
+ // Add handler for topology change
+ this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); });
+
+ // Log info warning if the socketTimeout < haInterval as it will cause
+ // a lot of recycled connections to happen.
+ if(this.s.logger.isWarn()
+ && this.s.options.socketTimeout != 0
+ && this.s.options.socketTimeout < this.s.haInterval) {
+ this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
+ , this.s.options.socketTimeout, this.s.haInterval));
+ }
+
+ // All the authProviders
+ this.authProviders = options.authProviders || {
+ 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
+ , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
+ , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
+ }
+
+ // Add forwarding of events from state handler
+ var types = ['joined', 'left'];
+ types.forEach(function(x) {
+ self.s.replicaSetState.on(x, function(t, s) {
+ if(self.state === CONNECTED && x === 'joined' && t == 'primary') {
+ self.emit('reconnect', self);
+ }
+
+ self.emit(x, t, s);
+ });
+ });
+
+ // Connect stat
+ this.initialConnectState = {
+ connect: false, fullsetup: false, all: false
+ }
+
+ // Disconnected state
+ this.state = DISCONNECTED;
+ this.haTimeoutId = null;
+ // Are we authenticating
+ this.authenticating = false;
+ // Last ismaster
+ this.ismaster = null;
+}
+
+inherits(ReplSet, EventEmitter);
+
+Object.defineProperty(ReplSet.prototype, 'type', {
+ enumerable:true, get: function() { return 'replset'; }
+});
+
+Object.defineProperty(ReplSet.prototype, 'parserType', {
+ enumerable:true, get: function() {
+ return BSON.native ? "c++" : "js";
+ }
+});
+
+function attemptReconnect(self) {
+ if(self.runningAttempReconnect) return;
+ // Set as running
+ self.runningAttempReconnect = true;
+ // Wait before execute
+ self.haTimeoutId = setTimeout(function() {
+ if(self.state == DESTROYED) return;
+
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id));
+ }
+
+ // Get all known hosts
+ var keys = Object.keys(self.s.replicaSetState.set);
+ var servers = keys.map(function(x) {
+ return new Server(assign({}, self.s.options, {
+ host: x.split(':')[0], port: parseInt(x.split(':')[1], 10)
+ }, {
+ authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
+ }, {
+ clientInfo: clone(self.s.clientInfo)
+ }));
+ });
+
+ // Create the list of servers
+ self.s.connectingServers = servers.slice(0);
+
+ // Handle all events coming from servers
+ function _handleEvent(self, event) {
+ return function() {
+ // Destroy the instance
+ if(self.state == DESTROYED) {
+ return this.destroy();
+ }
+
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event));
+ }
+
+ // Check if we are done
+ function done() {
+ // Done with the reconnection attempt
+ if(self.s.connectingServers.length == 0) {
+ if(self.state == DESTROYED) return;
+
+ // If we have a primary and a disconnect handler, execute
+ // buffered operations
+ if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute();
+ } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute({ executePrimary:true });
+ } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute({ executeSecondary:true });
+ }
+
+ // Do we have a primary
+ if(self.s.replicaSetState.hasPrimary()) {
+ // Emit reconnect as new primary was discovered
+ self.emit('reconnect', self);
+
+ // Connect any missing servers
+ connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id));
+ }
+
+ // Reset the running
+ self.runningAttempReconnect = false;
+
+ // Go back to normal topology monitoring
+ // Schedule a topology monitoring sweep
+ setTimeout(function() {
+ topologyMonitor(self);
+ }, self.s.haInterval);
+ });
+ } else {
+ if(self.listeners("close").length > 0) {
+ self.emit('close', self);
+ }
+
+ // Reset the running
+ self.runningAttempReconnect = false;
+ // Attempt a new reconnect
+ attemptReconnect(self);
+ }
+ }
+ }
+
+ // Remove the server from our list
+ for(var i = 0; i < self.s.connectingServers.length; i++) {
+ if(self.s.connectingServers[i].equals(this)) {
+ self.s.connectingServers.splice(i, 1);
+ }
+ }
+
+ // Keep reference to server
+ var _self = this;
+
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id));
+ }
+
+ // Connect and not authenticating
+ if(event == 'connect' && !self.authenticating) {
+ if(self.state == DESTROYED) {
+ return _self.destroy();
+ }
+
+ // Update the replicaset state
+ if(self.s.replicaSetState.update(_self)) {
+ // Primary lastIsMaster store it
+ if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
+ self.ismaster = _self.lastIsMaster();
+ }
+
+ // Remove the handlers
+ for(i = 0; i < handlers.length; i++) {
+ _self.removeAllListeners(handlers[i]);
+ }
+
+ // Add stable state handlers
+ _self.on('error', handleEvent(self, 'error'));
+ _self.on('close', handleEvent(self, 'close'));
+ _self.on('timeout', handleEvent(self, 'timeout'));
+ _self.on('parseError', handleEvent(self, 'parseError'));
+ } else {
+ _self.destroy();
+ }
+ } else if(event == 'connect' && self.authenticating) {
+ this.destroy();
+ }
+
+ done();
+ }
+ }
+
+ // Index used to interleaf the server connects, avoiding
+ // runtime issues on io constrained vm's
+ var timeoutInterval = 0;
+
+ function connect(server, timeoutInterval) {
+ setTimeout(function() {
+ server.once('connect', _handleEvent(self, 'connect'));
+ server.once('close', _handleEvent(self, 'close'));
+ server.once('timeout', _handleEvent(self, 'timeout'));
+ server.once('error', _handleEvent(self, 'error'));
+ server.once('parseError', _handleEvent(self, 'parseError'));
+
+ // SDAM Monitoring events
+ server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
+ server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
+ server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
+
+ server.connect(self.s.connectOptions);
+ }, timeoutInterval);
+ }
+
+ // Connect all servers
+ while(servers.length > 0) {
+ connect(servers.shift(), timeoutInterval++);
+ }
+ }, self.s.minHeartbeatFrequencyMS);
+}
+
+function connectNewServers(self, servers, callback) {
+ // Count lefts
+ var count = servers.length;
+
+ // Handle events
+ var _handleEvent = function(self, event) {
+ return function() {
+ var _self = this;
+ count = count - 1;
+
+ // Destroyed
+ if(self.state == DESTROYED) {
+ return this.destroy();
+ }
+
+ if(event == 'connect' && !self.authenticating) {
+ // Destroyed
+ if(self.state == DESTROYED) {
+ return _self.destroy();
+ }
+
+ var result = self.s.replicaSetState.update(_self);
+ // Update the state with the new server
+ if(result) {
+ // Primary lastIsMaster store it
+ if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
+ self.ismaster = _self.lastIsMaster();
+ }
+
+ // Remove the handlers
+ for(var i = 0; i < handlers.length; i++) {
+ _self.removeAllListeners(handlers[i]);
+ }
+
+ // Add stable state handlers
+ _self.on('error', handleEvent(self, 'error'));
+ _self.on('close', handleEvent(self, 'close'));
+ _self.on('timeout', handleEvent(self, 'timeout'));
+ _self.on('parseError', handleEvent(self, 'parseError'));
+ } else {
+ _self.destroy();
+ }
+ } else if(event == 'connect' && self.authenticating) {
+ this.destroy();
+ }
+
+ // Are we done finish up callback
+ if(count == 0) { callback(); }
+ }
+ }
+
+ // No new servers
+ if(count == 0) return callback();
+
+ // Execute method
+ function execute(_server, i) {
+ setTimeout(function() {
+ // Destroyed
+ if(self.state == DESTROYED) {
+ return;
+ }
+
+ // Create a new server instance
+ var server = new Server(assign({}, self.s.options, {
+ host: _server.split(':')[0],
+ port: parseInt(_server.split(':')[1], 10)
+ }, {
+ authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
+ }, {
+ clientInfo: clone(self.s.clientInfo)
+ }));
+
+ // Add temp handlers
+ server.once('connect', _handleEvent(self, 'connect'));
+ server.once('close', _handleEvent(self, 'close'));
+ server.once('timeout', _handleEvent(self, 'timeout'));
+ server.once('error', _handleEvent(self, 'error'));
+ server.once('parseError', _handleEvent(self, 'parseError'));
+
+ // SDAM Monitoring events
+ server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
+ server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
+ server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
+ server.connect(self.s.connectOptions);
+ }, i);
+ }
+
+ // Create new instances
+ for(var i = 0; i < servers.length; i++) {
+ execute(servers[i], i);
+ }
+}
+
+function topologyMonitor(self, options) {
+ options = options || {};
+
+ // Set momitoring timeout
+ self.haTimeoutId = setTimeout(function() {
+ if(self.state == DESTROYED) return;
+
+ // Is this a on connect topology discovery
+ // Schedule a proper topology monitoring to happen
+ // To ensure any discovered servers do not timeout
+ // while waiting for the initial discovery to happen.
+ if(options.haInterval) {
+ topologyMonitor(self);
+ }
+
+ // If we have a primary and a disconnect handler, execute
+ // buffered operations
+ if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute();
+ } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute({ executePrimary:true });
+ } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
+ self.s.disconnectHandler.execute({ executeSecondary:true });
+ }
+
+ // Get the connectingServers
+ var connectingServers = self.s.replicaSetState.allServers();
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]'
+ , self.id
+ , connectingServers.map(function(x) {
+ return x.name;
+ })));
+ }
+ // Get the count
+ var count = connectingServers.length;
+
+ // If we have no servers connected
+ if(count == 0 && !options.haInterval) {
+ if(self.listeners("close").length > 0) {
+ self.emit('close', self);
+ }
+
+ return attemptReconnect(self);
+ }
+
+ // If the count is zero schedule a new fast
+ function pingServer(_self, _server, cb) {
+ // Measure running time
+ var start = new Date().getTime();
+
+ // Emit the server heartbeat start
+ emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
+ // Execute ismaster
+ // Set the socketTimeout for a monitoring message to a low number
+ // Ensuring ismaster calls are timed out quickly
+ _server.command('admin.$cmd', {
+ ismaster:true
+ }, {
+ monitoring: true,
+ socketTimeout: self.s.options.connectionTimeout || 2000,
+ }, function(err, r) {
+ if(self.state == DESTROYED) {
+ _server.destroy();
+ return cb(err, r);
+ }
+
+ // Calculate latency
+ var latencyMS = new Date().getTime() - start;
+
+ // Set the last updatedTime
+ var hrTime = process.hrtime();
+ // Calculate the last update time
+ _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000);
+
+ // We had an error, remove it from the state
+ if(err) {
+ // Emit the server heartbeat failure
+ emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
+ // Remove server from the state
+ _self.s.replicaSetState.remove(_server);
+ } else {
+ // Update the server ismaster
+ _server.ismaster = r.result;
+
+ // Check if we have a lastWriteDate convert it to MS
+ // and store on the server instance for later use
+ if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) {
+ _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime();
+ }
+
+ // Do we have a brand new server
+ if(_server.lastIsMasterMS == -1) {
+ _server.lastIsMasterMS = latencyMS;
+ } else if(_server.lastIsMasterMS) {
+ // After the first measurement, average RTT MUST be computed using an
+ // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
+ // If the prior average is denoted old_rtt, then the new average (new_rtt) is
+ // computed from a new RTT measurement (x) using the following formula:
+ // alpha = 0.2
+ // new_rtt = alpha * x + (1 - alpha) * old_rtt
+ _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS;
+ }
+
+ if(_self.s.replicaSetState.update(_server)) {
+ // Primary lastIsMaster store it
+ if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) {
+ self.ismaster = _server.lastIsMaster();
+ }
+ }
+
+ // Server heart beat event
+ emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
+ }
+
+ // Calculate the stalness for this server
+ self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval);
+
+ // Callback
+ cb(err, r);
+ });
+ }
+
+ // Connect any missing servers
+ function connectMissingServers() {
+ if(self.state == DESTROYED) return;
+
+ // Attempt to connect to any unknown servers
+ connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
+ if(self.state == DESTROYED) return;
+
+ // Check if we have an options.haInterval (meaning it was triggered from connect)
+ if(options.haInterval) {
+ // Do we have a primary and secondary
+ if(self.state == CONNECTING
+ && self.s.replicaSetState.hasPrimaryAndSecondary()) {
+ // Transition to connected
+ stateTransition(self, CONNECTED);
+ // Update initial state
+ self.initialConnectState.connect = true;
+ self.initialConnectState.fullsetup = true;
+ self.initialConnectState.all = true;
+ // Emit fullsetup and all events
+ process.nextTick(function() {
+ self.emit('connect', self);
+ self.emit('fullsetup', self);
+ self.emit('all', self);
+ });
+ } else if(self.state == CONNECTING
+ && self.s.replicaSetState.hasPrimary()) {
+ // Transition to connected
+ stateTransition(self, CONNECTED);
+ // Update initial state
+ self.initialConnectState.connect = true;
+ // Emit connected sign
+ process.nextTick(function() {
+ self.emit('connect', self);
+ });
+ } else if(self.state == CONNECTING
+ && self.s.replicaSetState.hasSecondary()
+ && self.s.options.secondaryOnlyConnectionAllowed) {
+ // Transition to connected
+ stateTransition(self, CONNECTED);
+ // Update initial state
+ self.initialConnectState.connect = true;
+ // Emit connected sign
+ process.nextTick(function() {
+ self.emit('connect', self);
+ });
+ } else if(self.state == CONNECTING) {
+ self.emit('error', new MongoError('no primary found in replicaset'));
+ // Destroy the topology
+ return self.destroy();
+ } else if(self.state == CONNECTED
+ && self.s.replicaSetState.hasPrimaryAndSecondary()
+ && !self.initialConnectState.fullsetup) {
+ self.initialConnectState.fullsetup = true;
+ // Emit fullsetup and all events
+ process.nextTick(function() {
+ self.emit('fullsetup', self);
+ self.emit('all', self);
+ });
+ }
+ }
+
+ if(!options.haInterval) topologyMonitor(self);
+ });
+ }
+
+ // No connectingServers but unknown servers
+ if(connectingServers.length == 0
+ && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) {
+ return connectMissingServers();
+ } else if(connectingServers.length == 0 && options.haInterval) {
+ self.destroy();
+ return self.emit('error', new MongoError('no valid replicaset members found'));
+ }
+
+ // Ping all servers
+ for(var i = 0; i < connectingServers.length; i++) {
+ pingServer(self, connectingServers[i], function() {
+ count = count - 1;
+
+ if(count == 0) {
+ connectMissingServers();
+ }
+ });
+ }
+ }, options.haInterval || self.s.haInterval)
+}
+
+function handleEvent(self, event) {
+ return function() {
+ if(self.state == DESTROYED) return;
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id));
+ }
+
+ self.s.replicaSetState.remove(this);
+ }
+}
+
+function handleInitialConnectEvent(self, event) {
+ return function() {
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id));
+ }
+
+ // Destroy the instance
+ if(self.state == DESTROYED) {
+ return this.destroy();
+ }
+
+ // Check the type of server
+ if(event == 'connect') {
+ // Update the state
+ var result = self.s.replicaSetState.update(this);
+ if(result == true) {
+ // Primary lastIsMaster store it
+ if(this.lastIsMaster() && this.lastIsMaster().ismaster) {
+ self.ismaster = this.lastIsMaster();
+ }
+
+ // Debug log
+ if(self.s.logger.isDebug()) {
+ self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set)));
+ }
+
+ // Remove the handlers
+ for(var i = 0; i < handlers.length; i++) {
+ this.removeAllListeners(handlers[i]);
+ }
+
+ // Add stable state handlers
+ this.on('error', handleEvent(self, 'error'));
+ this.on('close', handleEvent(self, 'close'));
+ this.on('timeout', handleEvent(self, 'timeout'));
+ this.on('parseError', handleEvent(self, 'parseError'));
+ } else if(result instanceof MongoError) {
+ this.destroy();
+ self.destroy();
+ return self.emit('error', result);
+ } else {
+ this.destroy();
+ }
+ } else {
+ // Emit failure to connect
+ self.emit('failed', this);
+ // Remove from the state
+ self.s.replicaSetState.remove(this);
+ }
+
+ // Remove from the list from connectingServers
+ for(i = 0; i < self.s.connectingServers.length; i++) {
+ if(self.s.connectingServers[i].equals(this)) {
+ self.s.connectingServers.splice(i, 1);
+ }
+ }
+
+ // Trigger topologyMonitor
+ if(self.s.connectingServers.length == 0) {
+ topologyMonitor(self, {haInterval: 1});
+ }
+ };
+}
+
+function connectServers(self, servers) {
+ // Update connectingServers
+ self.s.connectingServers = self.s.connectingServers.concat(servers);
+
+ // Index used to interleaf the server connects, avoiding
+ // runtime issues on io constrained vm's
+ var timeoutInterval = 0;
+
+ function connect(server, timeoutInterval) {
+ setTimeout(function() {
+ // Add the server to the state
+ if(self.s.replicaSetState.update(server)) {
+ // Primary lastIsMaster store it
+ if(server.lastIsMaster() && server.lastIsMaster().ismaster) {
+ self.ismaster = server.lastIsMaster();
+ }
+ }
+
+ // Add event handlers
+ server.once('close', handleInitialConnectEvent(self, 'close'));
+ server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
+ server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
+ server.once('error', handleInitialConnectEvent(self, 'error'));
+ server.once('connect', handleInitialConnectEvent(self, 'connect'));
+ // SDAM Monitoring events
+ server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
+ server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
+ server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
+ // Start connection
+ server.connect(self.s.connectOptions);
+ }, timeoutInterval);
+ }
+
+ // Start all the servers
+ while(servers.length > 0) {
+ connect(servers.shift(), timeoutInterval++);
+ }
+}
+
+/**
+ * Emit event if it exists
+ * @method
+ */
+function emitSDAMEvent(self, event, description) {
+ if(self.listeners(event).length > 0) {
+ self.emit(event, description);
+ }
+}
+
+/**
+ * Initiate server connect
+ * @method
+ * @param {array} [options.auth=null] Array of auth options to apply on connect
+ */
+ReplSet.prototype.connect = function(options) {
+ var self = this;
+ // Add any connect level options to the internal state
+ this.s.connectOptions = options || {};
+ // Set connecting state
+ stateTransition(this, CONNECTING);
+ // Create server instances
+ var servers = this.s.seedlist.map(function(x) {
+ return new Server(assign({}, self.s.options, x, {
+ authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
+ }, {
+ clientInfo: clone(self.s.clientInfo)
+ }));
+ });
+
+ // Error out as high availbility interval must be < than socketTimeout
+ if(this.s.options.socketTimeout > 0 && this.s.options.socketTimeout <= this.s.options.haInterval) {
+ return self.emit('error', new MongoError(f("haInterval [%s] MS must be set to less than socketTimeout [%s] MS"
+ , this.s.options.haInterval, this.s.options.socketTimeout)));
+ }
+
+ // Emit the topology opening event
+ emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
+ // Start all server connections
+ connectServers(self, servers);
+}
+
+/**
+ * Destroy the server connection
+ * @param {boolean} [options.force=false] Force destroy the pool
+ * @method
+ */
+ReplSet.prototype.destroy = function(options) {
+ options = options || {};
+ // Transition state
+ stateTransition(this, DESTROYED);
+ // Clear out any monitoring process
+ if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
+ // Destroy the replicaset
+ this.s.replicaSetState.destroy(options);
+
+ // Destroy all connecting servers
+ this.s.connectingServers.forEach(function(x) {
+ x.destroy(options);
+ });
+
+ // Emit toplogy closing event
+ emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
+}
+
+/**
+ * Unref all connections belong to this server
+ * @method
+ */
+ReplSet.prototype.unref = function() {
+ // Transition state
+ stateTransition(this, DISCONNECTED);
+
+ this.s.replicaSetState.allServers().forEach(function(x) {
+ x.unref();
+ });
+
+ clearTimeout(this.haTimeoutId);
+}
+
+/**
+ * Returns the last known ismaster document for this server
+ * @method
+ * @return {object}
+ */
+ReplSet.prototype.lastIsMaster = function() {
+ return this.s.replicaSetState.primary
+ ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster;
+}
+
+/**
+ * All raw connections
+ * @method
+ * @return {Connection[]}
+ */
+ReplSet.prototype.connections = function() {
+ var servers = this.s.replicaSetState.allServers();
+ var connections = [];
+ for(var i = 0; i < servers.length; i++) {
+ connections = connections.concat(servers[i].connections());
+ }
+
+ return connections;
+}
+
+/**
+ * Figure out if the server is connected
+ * @method
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @return {boolean}
+ */
+ReplSet.prototype.isConnected = function(options) {
+ options = options || {};
+
+ // If we are authenticating signal not connected
+ // To avoid interleaving of operations
+ if(this.authenticating) return false;
+
+ // If we specified a read preference check if we are connected to something
+ // than can satisfy this
+ if(options.readPreference
+ && options.readPreference.equals(ReadPreference.secondary)) {
+ return this.s.replicaSetState.hasSecondary();
+ }
+
+ if(options.readPreference
+ && options.readPreference.equals(ReadPreference.primary)) {
+ return this.s.replicaSetState.hasPrimary();
+ }
+
+ if(options.readPreference
+ && options.readPreference.equals(ReadPreference.primaryPreferred)) {
+ return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
+ }
+
+ if(options.readPreference
+ && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
+ return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
+ }
+
+ if(this.s.secondaryOnlyConnectionAllowed
+ && this.s.replicaSetState.hasSecondary()) {
+ return true;
+ }
+
+ return this.s.replicaSetState.hasPrimary();
+}
+
+/**
+ * Figure out if the replicaset instance was destroyed by calling destroy
+ * @method
+ * @return {boolean}
+ */
+ReplSet.prototype.isDestroyed = function() {
+ return this.state == DESTROYED;
+}
+
+/**
+ * Get server
+ * @method
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @return {Server}
+ */
+ReplSet.prototype.getServer = function(options) {
+ // Ensure we have no options
+ options = options || {};
+
+ // Pick the right server baspickServerd on readPreference
+ var server = this.s.replicaSetState.pickServer(options.readPreference);
+ if(this.s.debug) this.emit('pickedServer', options.readPreference, server);
+ return server;
+}
+
+/**
+ * Get all connected servers
+ * @method
+ * @return {Server[]}
+ */
+ReplSet.prototype.getServers = function() {
+ return this.s.replicaSetState.allServers();
+}
+
+//
+// Execute write operation
+var executeWriteOperation = function(self, op, ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {}, options = options || {};
+ // Ensure we have no options
+ options = options || {};
+
+ // No server returned we had an error
+ if(self.s.replicaSetState.primary == null) {
+ return callback(new MongoError("no primary server found"));
+ }
+
+ // Execute the command
+ self.s.replicaSetState.primary[op](ns, ops, options, callback);
+}
+
+/**
+ * Insert one or more documents
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of documents to insert
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+ReplSet.prototype.insert = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {}, options = options || {};
+ if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+
+ // Not connected but we have a disconnecthandler
+ if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
+ }
+
+ // Execute write operation
+ executeWriteOperation(this, 'insert', ns, ops, options, callback);
+}
+
+/**
+ * Perform one or more update operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of updates
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+ReplSet.prototype.update = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {}, options = options || {};
+ if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+
+ // Not connected but we have a disconnecthandler
+ if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('update', ns, ops, options, callback);
+ }
+
+ // Execute write operation
+ executeWriteOperation(this, 'update', ns, ops, options, callback);
+}
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {array} ops An array of removes
+ * @param {boolean} [options.ordered=true] Execute in order or out of order
+ * @param {object} [options.writeConcern={}] Write concern for the operation
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+ReplSet.prototype.remove = function(ns, ops, options, callback) {
+ if(typeof options == 'function') callback = options, options = {}, options = options || {};
+ if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+
+ // Not connected but we have a disconnecthandler
+ if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
+ }
+
+ // Execute write operation
+ executeWriteOperation(this, 'remove', ns, ops, options, callback);
+}
+
+/**
+ * Execute a command
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {object} cmd The command hash
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @param {Connection} [options.connection] Specify connection object to execute command against
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+ReplSet.prototype.command = function(ns, cmd, options, callback) {
+ if(typeof options == 'function') callback = options, options = {}, options = options || {};
+ if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
+ var self = this;
+
+ // Establish readPreference
+ var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
+
+ // If the readPreference is primary and we have no primary, store it
+ if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
+ } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
+ } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
+ return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
+ }
+
+ // Pick a server
+ var server = this.s.replicaSetState.pickServer(readPreference);
+ // We received an error, return it
+ if(!(server instanceof Server)) return callback(server);
+ // Emit debug event
+ if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
+
+ // No server returned we had an error
+ if(server == null) {
+ return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference)));
+ }
+
+ // Execute the command
+ server.command(ns, cmd, options, callback);
+}
+
+/**
+ * Authenticate using a specified mechanism
+ * @method
+ * @param {string} mechanism The Auth mechanism we are invoking
+ * @param {string} db The db we are invoking the mechanism against
+ * @param {...object} param Parameters for the specific mechanism
+ * @param {authResultCallback} callback A callback function
+ */
+ReplSet.prototype.auth = function(mechanism, db) {
+ var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
+ var self = this;
+ var args = Array.prototype.slice.call(arguments, 2);
+ var callback = args.pop();
+
+ // If we don't have the mechanism fail
+ if(this.authProviders[mechanism] == null && mechanism != 'default') {
+ return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
+ }
+
+ // Are we already authenticating, throw
+ if(this.authenticating) {
+ return callback(new MongoError('authentication or logout allready in process'));
+ }
+
+ // Topology is not connected, save the call in the provided store to be
+ // Executed at some point when the handler deems it's reconnected
+ if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) {
+ return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
+ }
+
+ // Set to authenticating
+ this.authenticating = true;
+ // All errors
+ var errors = [];
+
+ // Get all the servers
+ var servers = this.s.replicaSetState.allServers();
+ // No servers return
+ if(servers.length == 0) {
+ this.authenticating = false;
+ callback(null, true);
+ }
+
+ // Authenticate
+ function auth(server) {
+ // Arguments without a callback
+ var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
+ // Create arguments
+ var finalArguments = argsWithoutCallback.concat([function(err) {
+ count = count - 1;
+ // Save all the errors
+ if(err) errors.push({name: server.name, err: err});
+ // We are done
+ if(count == 0) {
+ // Auth is done
+ self.authenticating = false;
+
+ // Return the auth error
+ if(errors.length) return callback(MongoError.create({
+ message: 'authentication fail', errors: errors
+ }), false);
+
+ // Successfully authenticated session
+ callback(null, self);
+ }
+ }]);
+
+ if(!server.lastIsMaster().arbiterOnly) {
+ // Execute the auth only against non arbiter servers
+ server.auth.apply(server, finalArguments);
+ } else {
+ // If we are authenticating against an arbiter just ignore it
+ finalArguments.pop()(null);
+ }
+ }
+
+ // Get total count
+ var count = servers.length;
+ // Authenticate against all servers
+ while(servers.length > 0) {
+ auth(servers.shift());
+ }
+}
+
+/**
+ * Logout from a database
+ * @method
+ * @param {string} db The db we are logging out from
+ * @param {authResultCallback} callback A callback function
+ */
+ReplSet.prototype.logout = function(dbName, callback) {
+ var self = this;
+ // Are we authenticating or logging out, throw
+ if(this.authenticating) {
+ throw new MongoError('authentication or logout allready in process');
+ }
+
+ // Ensure no new members are processed while logging out
+ this.authenticating = true;
+
+ // Remove from all auth providers (avoid any reaplication of the auth details)
+ var providers = Object.keys(this.authProviders);
+ for(var i = 0; i < providers.length; i++) {
+ this.authProviders[providers[i]].logout(dbName);
+ }
+
+ // Now logout all the servers
+ var servers = this.s.replicaSetState.allServers();
+ var count = servers.length;
+ if(count == 0) return callback();
+ var errors = [];
+
+ function logoutServer(_server, cb) {
+ _server.logout(dbName, function(err) {
+ if(err) errors.push({name: _server.name, err: err});
+ cb();
+ });
+ }
+
+ // Execute logout on all server instances
+ for(i = 0; i < servers.length; i++) {
+ logoutServer(servers[i], function() {
+ count = count - 1;
+
+ if(count == 0) {
+ // Do not block new operations
+ self.authenticating = false;
+ // If we have one or more errors
+ if(errors.length) return callback(MongoError.create({
+ message: f('logout failed against db %s', dbName), errors: errors
+ }), false);
+
+ // No errors
+ callback();
+ }
+ })
+ }
+}
+
+/**
+ * Perform one or more remove operations
+ * @method
+ * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
+ * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
+ * @param {object} [options.batchSize=0] Batchsize for the operation
+ * @param {array} [options.documents=[]] Initial documents list for cursor
+ * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
+ * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
+ * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
+ * @param {opResultCallback} callback A callback function
+ */
+ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
+ cursorOptions = cursorOptions || {};
+ var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
+ return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
+}
+
+/**
+ * A replset connect event, used to verify that the connection is up and running
+ *
+ * @event ReplSet#connect
+ * @type {ReplSet}
+ */
+
+/**
+ * A replset reconnect event, used to verify that the topology reconnected
+ *
+ * @event ReplSet#reconnect
+ * @type {ReplSet}
+ */
+
+/**
+ * A replset fullsetup event, used to signal that all topology members have been contacted.
+ *
+ * @event ReplSet#fullsetup
+ * @type {ReplSet}
+ */
+
+/**
+ * A replset all event, used to signal that all topology members have been contacted.
+ *
+ * @event ReplSet#all
+ * @type {ReplSet}
+ */
+
+/**
+ * A replset failed event, used to signal that initial replset connection failed.
+ *
+ * @event ReplSet#failed
+ * @type {ReplSet}
+ */
+
+/**
+ * A server member left the replicaset
+ *
+ * @event ReplSet#left
+ * @type {function}
+ * @param {string} type The type of member that left (primary|secondary|arbiter)
+ * @param {Server} server The server object that left
+ */
+
+/**
+ * A server member joined the replicaset
+ *
+ * @event ReplSet#joined
+ * @type {function}
+ * @param {string} type The type of member that joined (primary|secondary|arbiter)
+ * @param {Server} server The server object that joined
+ */
+
+/**
+ * A server opening SDAM monitoring event
+ *
+ * @event ReplSet#serverOpening
+ * @type {object}
+ */
+
+/**
+ * A server closed SDAM monitoring event
+ *
+ * @event ReplSet#serverClosed
+ * @type {object}
+ */
+
+/**
+ * A server description SDAM change monitoring event
+ *
+ * @event ReplSet#serverDescriptionChanged
+ * @type {object}
+ */
+
+/**
+ * A topology open SDAM event
+ *
+ * @event ReplSet#topologyOpening
+ * @type {object}
+ */
+
+/**
+ * A topology closed SDAM event
+ *
+ * @event ReplSet#topologyClosed
+ * @type {object}
+ */
+
+/**
+ * A topology structure SDAM change event
+ *
+ * @event ReplSet#topologyDescriptionChanged
+ * @type {object}
+ */
+
+/**
+ * A topology serverHeartbeatStarted SDAM event
+ *
+ * @event ReplSet#serverHeartbeatStarted
+ * @type {object}
+ */
+
+/**
+ * A topology serverHeartbeatFailed SDAM event
+ *
+ * @event ReplSet#serverHeartbeatFailed
+ * @type {object}
+ */
+
+/**
+ * A topology serverHeartbeatSucceeded SDAM change event
+ *
+ * @event ReplSet#serverHeartbeatSucceeded
+ * @type {object}
+ */
+
+module.exports = ReplSet;