diff options
Diffstat (limited to 'common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies')
6 files changed, 4706 insertions, 0 deletions
diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/mongos.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/mongos.js new file mode 100644 index 0000000..d0ffcff --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/mongos.js @@ -0,0 +1,1133 @@ +"use strict" + +var inherits = require('util').inherits, + f = require('util').format, + EventEmitter = require('events').EventEmitter, + BasicCursor = require('../cursor'), + Logger = require('../connection/logger'), + retrieveBSON = require('../connection/utils').retrieveBSON, + MongoError = require('../error'), + Server = require('./server'), + assign = require('./shared').assign, + clone = require('./shared').clone, + createClientInfo = require('./shared').createClientInfo; + +var BSON = retrieveBSON(); + +/** + * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is + * used to construct connections. + * + * @example + * var Mongos = require('mongodb-core').Mongos + * , ReadPreference = require('mongodb-core').ReadPreference + * , assert = require('assert'); + * + * var server = new Mongos([{host: 'localhost', port: 30000}]); + * // Wait for the connection event + * server.on('connect', function(server) { + * server.destroy(); + * }); + * + * // Start connecting + * server.connect(); + */ + +var MongoCR = require('../auth/mongocr') + , X509 = require('../auth/x509') + , Plain = require('../auth/plain') + , GSSAPI = require('../auth/gssapi') + , SSPI = require('../auth/sspi') + , ScramSHA1 = require('../auth/scram'); + +// +// States +var DISCONNECTED = 'disconnected'; +var CONNECTING = 'connecting'; +var CONNECTED = 'connected'; +var DESTROYED = 'destroyed'; + +function stateTransition(self, newState) { + var legalTransitions = { + 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED], + 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED], + 'connected': [CONNECTED, DISCONNECTED, DESTROYED], + 'destroyed': [DESTROYED] + } + + // Get current state + var legalStates = legalTransitions[self.state]; + if(legalStates && legalStates.indexOf(newState) != -1) { + self.state = newState; + } else { + self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]' + , self.id, self.state, newState, legalStates)); + } +} + +// +// ReplSet instance id +var id = 1; +var handlers = ['connect', 'close', 'error', 'timeout', 'parseError']; + +/** + * Creates a new Mongos instance + * @class + * @param {array} seedlist A list of seeds for the replicaset + * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry + * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors + * @param {number} [options.size=5] Server connection pool size + * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled + * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection + * @param {boolean} [options.noDelay=true] TCP Connection no delay + * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting + * @param {number} [options.socketTimeout=0] TCP Socket timeout setting + * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed + * @param {boolean} [options.ssl=false] Use SSL for connection + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {Buffer} [options.ca] SSL Certificate store binary buffer + * @param {Buffer} [options.cert] SSL Certificate binary buffer + * @param {Buffer} [options.key] SSL Key file binary buffer + * @param {string} [options.passphrase] SSL Certificate pass phrase + * @param {string} [options.servername=null] String containing the server name requested via TLS SNI. + * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates + * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits + * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. + * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @return {Mongos} A cursor instance + * @fires Mongos#connect + * @fires Mongos#reconnect + * @fires Mongos#joined + * @fires Mongos#left + * @fires Mongos#failed + * @fires Mongos#fullsetup + * @fires Mongos#all + * @fires Mongos#serverHeartbeatStarted + * @fires Mongos#serverHeartbeatSucceeded + * @fires Mongos#serverHeartbeatFailed + * @fires Mongos#topologyOpening + * @fires Mongos#topologyClosed + * @fires Mongos#topologyDescriptionChanged + * @property {string} type the topology type. + * @property {string} parserType the parser type used (c++ or js). + */ +var Mongos = function(seedlist, options) { + options = options || {}; + + // Get replSet Id + this.id = id++; + + // Internal state + this.s = { + options: assign({}, options), + // BSON instance + bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128, + BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey, + BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]), + // Factory overrides + Cursor: options.cursorFactory || BasicCursor, + // Logger instance + logger: Logger('Mongos', options), + // Seedlist + seedlist: seedlist, + // Ha interval + haInterval: options.haInterval ? options.haInterval : 10000, + // Disconnect handler + disconnectHandler: options.disconnectHandler, + // Server selection index + index: 0, + // Connect function options passed in + connectOptions: {}, + // Are we running in debug mode + debug: typeof options.debug == 'boolean' ? options.debug : false, + // localThresholdMS + localThresholdMS: options.localThresholdMS || 15, + // Client info + clientInfo: createClientInfo(options) + } + + // Set the client info + this.s.options.clientInfo = createClientInfo(options); + + // Log info warning if the socketTimeout < haInterval as it will cause + // a lot of recycled connections to happen. + if(this.s.logger.isWarn() + && this.s.options.socketTimeout != 0 + && this.s.options.socketTimeout < this.s.haInterval) { + this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts' + , this.s.options.socketTimeout, this.s.haInterval)); + } + + // All the authProviders + this.authProviders = options.authProviders || { + 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson) + , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson) + , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson) + } + + // Disconnected state + this.state = DISCONNECTED; + + // Current proxies we are connecting to + this.connectingProxies = []; + // Currently connected proxies + this.connectedProxies = []; + // Disconnected proxies + this.disconnectedProxies = []; + // Are we authenticating + this.authenticating = false; + // Index of proxy to run operations against + this.index = 0; + // High availability timeout id + this.haTimeoutId = null; + // Last ismaster + this.ismaster = null; + + // Add event listener + EventEmitter.call(this); +} + +inherits(Mongos, EventEmitter); + +Object.defineProperty(Mongos.prototype, 'type', { + enumerable:true, get: function() { return 'mongos'; } +}); + +Object.defineProperty(Mongos.prototype, 'parserType', { + enumerable:true, get: function() { + return BSON.native ? "c++" : "js"; + } +}); + +/** + * Emit event if it exists + * @method + */ +function emitSDAMEvent(self, event, description) { + if(self.listeners(event).length > 0) { + self.emit(event, description); + } +} + +/** + * Initiate server connect + * @method + * @param {array} [options.auth=null] Array of auth options to apply on connect + */ +Mongos.prototype.connect = function(options) { + var self = this; + // Add any connect level options to the internal state + this.s.connectOptions = options || {}; + // Set connecting state + stateTransition(this, CONNECTING); + // Create server instances + var servers = this.s.seedlist.map(function(x) { + return new Server(assign({}, self.s.options, x, { + authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true + }, { + clientInfo: clone(self.s.clientInfo) + })); + }); + + // Emit the topology opening event + emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id }); + + // Start all server connections + connectProxies(self, servers); +} + +function handleEvent(self) { + return function() { + if(self.state == DESTROYED) return; + // Move to list of disconnectedProxies + moveServerFrom(self.connectedProxies, self.disconnectedProxies, this); + // Emit the left signal + self.emit('left', 'mongos', this); + } +} + +function handleInitialConnectEvent(self, event) { + return function() { + // Destroy the instance + if(self.state == DESTROYED) { + // Move from connectingProxies + moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); + return this.destroy(); + } + + // Check the type of server + if(event == 'connect') { + // Get last known ismaster + self.ismaster = this.lastIsMaster(); + + // Is this not a proxy, remove t + if(self.ismaster.msg == 'isdbgrid') { + // Add to the connectd list + for(var i = 0; i < self.connectedProxies.length; i++) { + if(self.connectedProxies[i].name == this.name) { + // Move from connectingProxies + moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); + this.destroy(); + return self.emit('failed', this); + } + } + + // Remove the handlers + for(i = 0; i < handlers.length; i++) { + this.removeAllListeners(handlers[i]); + } + + // Add stable state handlers + this.on('error', handleEvent(self, 'error')); + this.on('close', handleEvent(self, 'close')); + this.on('timeout', handleEvent(self, 'timeout')); + this.on('parseError', handleEvent(self, 'parseError')); + + // Move from connecting proxies connected + moveServerFrom(self.connectingProxies, self.connectedProxies, this); + // Emit the joined event + self.emit('joined', 'mongos', this); + } else { + + // Print warning if we did not find a mongos proxy + if(self.s.logger.isWarn()) { + var message = 'expected mongos proxy, but found replicaset member mongod for server %s'; + // We have a standalone server + if(!self.ismaster.hosts) { + message = 'expected mongos proxy, but found standalone mongod for server %s'; + } + + self.s.logger.warn(f(message, this.name)); + } + + // This is not a mongos proxy, remove it completely + removeProxyFrom(self.connectingProxies, this); + // Emit the left event + self.emit('left', 'server', this); + // Emit failed event + self.emit('failed', this); + } + } else { + moveServerFrom(self.connectingProxies, self.disconnectedProxies, this); + // Emit the left event + self.emit('left', 'mongos', this); + // Emit failed event + self.emit('failed', this); + } + + // Trigger topologyMonitor + if(self.connectingProxies.length == 0) { + // Emit connected if we are connected + if(self.connectedProxies.length > 0) { + // Set the state to connected + stateTransition(self, CONNECTED); + // Emit the connect event + self.emit('connect', self); + self.emit('fullsetup', self); + self.emit('all', self); + } else if(self.disconnectedProxies.length == 0) { + // Print warning if we did not find a mongos proxy + if(self.s.logger.isWarn()) { + self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset')); + } + + // Emit the error that no proxies were found + return self.emit('error', new MongoError('no mongos proxies found in seed list')); + } + + // Topology monitor + topologyMonitor(self, {firstConnect:true}); + } + }; +} + +function connectProxies(self, servers) { + // Update connectingProxies + self.connectingProxies = self.connectingProxies.concat(servers); + + // Index used to interleaf the server connects, avoiding + // runtime issues on io constrained vm's + var timeoutInterval = 0; + + function connect(server, timeoutInterval) { + setTimeout(function() { + // Add event handlers + server.once('close', handleInitialConnectEvent(self, 'close')); + server.once('timeout', handleInitialConnectEvent(self, 'timeout')); + server.once('parseError', handleInitialConnectEvent(self, 'parseError')); + server.once('error', handleInitialConnectEvent(self, 'error')); + server.once('connect', handleInitialConnectEvent(self, 'connect')); + // SDAM Monitoring events + server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); + server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); + server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); + // Start connection + server.connect(self.s.connectOptions); + }, timeoutInterval); + } + // Start all the servers + while(servers.length > 0) { + connect(servers.shift(), timeoutInterval++); + } +} + +function pickProxy(self) { + // Get the currently connected Proxies + var connectedProxies = self.connectedProxies.slice(0); + + // Set lower bound + var lowerBoundLatency = Number.MAX_VALUE; + + // Determine the lower bound for the Proxies + for(var i = 0; i < connectedProxies.length; i++) { + if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) { + lowerBoundLatency = connectedProxies[i].lastIsMasterMS; + } + } + + // Filter out the possible servers + connectedProxies = connectedProxies.filter(function(server) { + if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS)) + && server.isConnected()) { + return true; + } + }); + + // We have no connectedProxies pick first of the connected ones + if(connectedProxies.length == 0) { + return self.connectedProxies[0]; + } + + // Get proxy + var proxy = connectedProxies[self.index % connectedProxies.length]; + // Update the index + self.index = (self.index + 1) % connectedProxies.length; + // Return the proxy + return proxy; +} + +function moveServerFrom(from, to, proxy) { + for(var i = 0; i < from.length; i++) { + if(from[i].name == proxy.name) { + from.splice(i, 1); + } + } + + for(i = 0; i < to.length; i++) { + if(to[i].name == proxy.name) { + to.splice(i, 1); + } + } + + to.push(proxy); +} + +function removeProxyFrom(from, proxy) { + for(var i = 0; i < from.length; i++) { + if(from[i].name == proxy.name) { + from.splice(i, 1); + } + } +} + +function reconnectProxies(self, proxies, callback) { + // Count lefts + var count = proxies.length; + + // Handle events + var _handleEvent = function(self, event) { + return function() { + var _self = this; + count = count - 1; + + // Destroyed + if(self.state == DESTROYED) { + moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); + return this.destroy(); + } + + if(event == 'connect' && !self.authenticating) { + // Destroyed + if(self.state == DESTROYED) { + moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); + return _self.destroy(); + } + + // Remove the handlers + for(var i = 0; i < handlers.length; i++) { + _self.removeAllListeners(handlers[i]); + } + + // Add stable state handlers + _self.on('error', handleEvent(self, 'error')); + _self.on('close', handleEvent(self, 'close')); + _self.on('timeout', handleEvent(self, 'timeout')); + _self.on('parseError', handleEvent(self, 'parseError')); + + // Move to the connected servers + moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self); + // Emit joined event + self.emit('joined', 'mongos', _self); + } else if(event == 'connect' && self.authenticating) { + // Move from connectingProxies + moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self); + this.destroy(); + } + + // Are we done finish up callback + if(count == 0) { + callback(); + } + } + } + + // No new servers + if(count == 0) { + return callback(); + } + + // Execute method + function execute(_server, i) { + setTimeout(function() { + // Destroyed + if(self.state == DESTROYED) { + return; + } + + // Create a new server instance + var server = new Server(assign({}, self.s.options, { + host: _server.name.split(':')[0], + port: parseInt(_server.name.split(':')[1], 10) + }, { + authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true + }, { + clientInfo: clone(self.s.clientInfo) + })); + + // Add temp handlers + server.once('connect', _handleEvent(self, 'connect')); + server.once('close', _handleEvent(self, 'close')); + server.once('timeout', _handleEvent(self, 'timeout')); + server.once('error', _handleEvent(self, 'error')); + server.once('parseError', _handleEvent(self, 'parseError')); + + // SDAM Monitoring events + server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); + server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); + server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); + server.connect(self.s.connectOptions); + }, i); + } + + // Create new instances + for(var i = 0; i < proxies.length; i++) { + execute(proxies[i], i); + } +} + +function topologyMonitor(self, options) { + options = options || {}; + + // Set momitoring timeout + self.haTimeoutId = setTimeout(function() { + if(self.state == DESTROYED) return; + // If we have a primary and a disconnect handler, execute + // buffered operations + if(self.isConnected() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute(); + } + + // Get the connectingServers + var proxies = self.connectedProxies.slice(0); + // Get the count + var count = proxies.length; + + // If the count is zero schedule a new fast + function pingServer(_self, _server, cb) { + // Measure running time + var start = new Date().getTime(); + + // Emit the server heartbeat start + emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name }); + + // Execute ismaster + _server.command('admin.$cmd', { + ismaster:true + }, { + monitoring: true, + socketTimeout: self.s.options.connectionTimeout || 2000, + }, function(err, r) { + if(self.state == DESTROYED) { + // Move from connectingProxies + moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server); + _server.destroy(); + return cb(err, r); + } + + // Calculate latency + var latencyMS = new Date().getTime() - start; + + // We had an error, remove it from the state + if(err) { + // Emit the server heartbeat failure + emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name }); + // Move from connected proxies to disconnected proxies + moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server); + } else { + // Update the server ismaster + _server.ismaster = r.result; + _server.lastIsMasterMS = latencyMS; + + // Server heart beat event + emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name }); + } + + cb(err, r); + }); + } + + // No proxies initiate monitor again + if(proxies.length == 0) { + // Emit close event if any listeners registered + if(self.listeners("close").length > 0 && self.state == CONNECTING) { + self.emit('error', new MongoError('no mongos proxy available')); + } else { + self.emit('close', self); + } + + // Attempt to connect to any unknown servers + return reconnectProxies(self, self.disconnectedProxies, function() { + if(self.state == DESTROYED) return; + + // Are we connected ? emit connect event + if(self.state == CONNECTING && options.firstConnect) { + self.emit('connect', self); + self.emit('fullsetup', self); + self.emit('all', self); + } else if(self.isConnected()) { + self.emit('reconnect', self); + } else if(!self.isConnected() && self.listeners("close").length > 0) { + self.emit('close', self); + } + + // Perform topology monitor + topologyMonitor(self); + }); + } + + // Ping all servers + for(var i = 0; i < proxies.length; i++) { + pingServer(self, proxies[i], function() { + count = count - 1; + + if(count == 0) { + if(self.state == DESTROYED) return; + + // Attempt to connect to any unknown servers + reconnectProxies(self, self.disconnectedProxies, function() { + if(self.state == DESTROYED) return; + // Perform topology monitor + topologyMonitor(self); + }); + } + }); + } + }, self.s.haInterval); +} + +/** + * Returns the last known ismaster document for this server + * @method + * @return {object} + */ +Mongos.prototype.lastIsMaster = function() { + return this.ismaster; +} + +/** + * Unref all connections belong to this server + * @method + */ +Mongos.prototype.unref = function() { + // Transition state + stateTransition(this, DISCONNECTED); + // Get all proxies + var proxies = this.connectedProxies.concat(this.connectingProxies); + proxies.forEach(function(x) { + x.unref(); + }); + + clearTimeout(this.haTimeoutId); +} + +/** + * Destroy the server connection + * @param {boolean} [options.force=false] Force destroy the pool + * @method + */ +Mongos.prototype.destroy = function(options) { + // Transition state + stateTransition(this, DESTROYED); + // Get all proxies + var proxies = this.connectedProxies.concat(this.connectingProxies); + // Clear out any monitoring process + if(this.haTimeoutId) clearTimeout(this.haTimeoutId); + + // Destroy all connecting servers + proxies.forEach(function(x) { + x.destroy(options); + }); + + // Emit toplogy closing event + emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id }); +} + +/** + * Figure out if the server is connected + * @method + * @return {boolean} + */ +Mongos.prototype.isConnected = function() { + return this.connectedProxies.length > 0; +} + +/** + * Figure out if the server instance was destroyed by calling destroy + * @method + * @return {boolean} + */ +Mongos.prototype.isDestroyed = function() { + return this.state == DESTROYED; +} + +// +// Operations +// + +// Execute write operation +var executeWriteOperation = function(self, op, ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + // Ensure we have no options + options = options || {}; + // Pick a server + var server = pickProxy(self); + // No server found error out + if(!server) return callback(new MongoError('no mongos proxy available')); + // Execute the command + server[op](ns, ops, options, callback); +} + +/** + * Insert one or more documents + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of documents to insert + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Mongos.prototype.insert = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.isConnected() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('insert', ns, ops, options, callback); + } + + // No mongos proxy available + if(!this.isConnected()) { + return callback(new MongoError('no mongos proxy available')); + } + + // Execute write operation + executeWriteOperation(this, 'insert', ns, ops, options, callback); +} + +/** + * Perform one or more update operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of updates + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Mongos.prototype.update = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.isConnected() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('update', ns, ops, options, callback); + } + + // No mongos proxy available + if(!this.isConnected()) { + return callback(new MongoError('no mongos proxy available')); + } + + // Execute write operation + executeWriteOperation(this, 'update', ns, ops, options, callback); +} + +/** + * Perform one or more remove operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of removes + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Mongos.prototype.remove = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.isConnected() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('remove', ns, ops, options, callback); + } + + // No mongos proxy available + if(!this.isConnected()) { + return callback(new MongoError('no mongos proxy available')); + } + + // Execute write operation + executeWriteOperation(this, 'remove', ns, ops, options, callback); +} + +/** + * Execute a command + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {object} cmd The command hash + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Connection} [options.connection] Specify connection object to execute command against + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Mongos.prototype.command = function(ns, cmd, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + var self = this; + + // Pick a proxy + var server = pickProxy(self); + + // Topology is not connected, save the call in the provided store to be + // Executed at some point when the handler deems it's reconnected + if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('command', ns, cmd, options, callback); + } + + // No server returned we had an error + if(server == null) { + return callback(new MongoError('no mongos proxy available')); + } + + // Execute the command + server.command(ns, cmd, options, callback); +} + +/** + * Perform one or more remove operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId + * @param {object} [options.batchSize=0] Batchsize for the operation + * @param {array} [options.documents=[]] Initial documents list for cursor + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Mongos.prototype.cursor = function(ns, cmd, cursorOptions) { + cursorOptions = cursorOptions || {}; + var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor; + return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options); +} + +/** + * Authenticate using a specified mechanism + * @method + * @param {string} mechanism The Auth mechanism we are invoking + * @param {string} db The db we are invoking the mechanism against + * @param {...object} param Parameters for the specific mechanism + * @param {authResultCallback} callback A callback function + */ +Mongos.prototype.auth = function(mechanism, db) { + var allArgs = Array.prototype.slice.call(arguments, 0).slice(0); + var self = this; + var args = Array.prototype.slice.call(arguments, 2); + var callback = args.pop(); + + // If we don't have the mechanism fail + if(this.authProviders[mechanism] == null && mechanism != 'default') { + return callback(new MongoError(f("auth provider %s does not exist", mechanism))); + } + + // Are we already authenticating, throw + if(this.authenticating) { + return callback(new MongoError('authentication or logout allready in process')); + } + + // Topology is not connected, save the call in the provided store to be + // Executed at some point when the handler deems it's reconnected + if(!self.isConnected() && self.s.disconnectHandler != null) { + return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); + } + + // Set to authenticating + this.authenticating = true; + // All errors + var errors = []; + + // Get all the servers + var servers = this.connectedProxies.slice(0); + // No servers return + if(servers.length == 0) { + this.authenticating = false; + callback(null, true); + } + + // Authenticate + function auth(server) { + // Arguments without a callback + var argsWithoutCallback = [mechanism, db].concat(args.slice(0)); + // Create arguments + var finalArguments = argsWithoutCallback.concat([function(err) { + count = count - 1; + // Save all the errors + if(err) errors.push({name: server.name, err: err}); + // We are done + if(count == 0) { + // Auth is done + self.authenticating = false; + + // Return the auth error + if(errors.length) return callback(MongoError.create({ + message: 'authentication fail', errors: errors + }), false); + + // Successfully authenticated session + callback(null, self); + } + }]); + + // Execute the auth only against non arbiter servers + if(!server.lastIsMaster().arbiterOnly) { + server.auth.apply(server, finalArguments); + } + } + + // Get total count + var count = servers.length; + // Authenticate against all servers + while(servers.length > 0) { + auth(servers.shift()); + } +} + +/** + * Logout from a database + * @method + * @param {string} db The db we are logging out from + * @param {authResultCallback} callback A callback function + */ +Mongos.prototype.logout = function(dbName, callback) { + var self = this; + // Are we authenticating or logging out, throw + if(this.authenticating) { + throw new MongoError('authentication or logout allready in process'); + } + + // Ensure no new members are processed while logging out + this.authenticating = true; + + // Remove from all auth providers (avoid any reaplication of the auth details) + var providers = Object.keys(this.authProviders); + for(var i = 0; i < providers.length; i++) { + this.authProviders[providers[i]].logout(dbName); + } + + // Now logout all the servers + var servers = this.connectedProxies.slice(0); + var count = servers.length; + if(count == 0) return callback(); + var errors = []; + + function logoutServer(_server, cb) { + _server.logout(dbName, function(err) { + if(err) errors.push({name: _server.name, err: err}); + cb(); + }); + } + + // Execute logout on all server instances + for(i = 0; i < servers.length; i++) { + logoutServer(servers[i], function() { + count = count - 1; + + if(count == 0) { + // Do not block new operations + self.authenticating = false; + // If we have one or more errors + if(errors.length) return callback(MongoError.create({ + message: f('logout failed against db %s', dbName), errors: errors + }), false); + + // No errors + callback(); + } + }) + } +} + +/** + * Get server + * @method + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @return {Server} + */ +Mongos.prototype.getServer = function() { + var server = pickProxy(this); + if(this.s.debug) this.emit('pickedServer', null, server); + return server; +} + +/** + * All raw connections + * @method + * @return {Connection[]} + */ +Mongos.prototype.connections = function() { + var connections = []; + + for(var i = 0; i < this.connectedProxies.length; i++) { + connections = connections.concat(this.connectedProxies[i].connections()); + } + + return connections; +} + +/** + * A mongos connect event, used to verify that the connection is up and running + * + * @event Mongos#connect + * @type {Mongos} + */ + +/** + * A mongos reconnect event, used to verify that the mongos topology has reconnected + * + * @event Mongos#reconnect + * @type {Mongos} + */ + +/** + * A mongos fullsetup event, used to signal that all topology members have been contacted. + * + * @event Mongos#fullsetup + * @type {Mongos} + */ + +/** + * A mongos all event, used to signal that all topology members have been contacted. + * + * @event Mongos#all + * @type {Mongos} + */ + +/** + * A server member left the mongos list + * + * @event Mongos#left + * @type {Mongos} + * @param {string} type The type of member that left (mongos) + * @param {Server} server The server object that left + */ + +/** + * A server member joined the mongos list + * + * @event Mongos#joined + * @type {Mongos} + * @param {string} type The type of member that left (mongos) + * @param {Server} server The server object that joined + */ + +/** + * A server opening SDAM monitoring event + * + * @event Mongos#serverOpening + * @type {object} + */ + +/** + * A server closed SDAM monitoring event + * + * @event Mongos#serverClosed + * @type {object} + */ + +/** + * A server description SDAM change monitoring event + * + * @event Mongos#serverDescriptionChanged + * @type {object} + */ + +/** + * A topology open SDAM event + * + * @event Mongos#topologyOpening + * @type {object} + */ + +/** + * A topology closed SDAM event + * + * @event Mongos#topologyClosed + * @type {object} + */ + +/** + * A topology structure SDAM change event + * + * @event Mongos#topologyDescriptionChanged + * @type {object} + */ + +/** + * A topology serverHeartbeatStarted SDAM event + * + * @event Mongos#serverHeartbeatStarted + * @type {object} + */ + +/** + * A topology serverHeartbeatFailed SDAM event + * + * @event Mongos#serverHeartbeatFailed + * @type {object} + */ + +/** + * A topology serverHeartbeatSucceeded SDAM change event + * + * @event Mongos#serverHeartbeatSucceeded + * @type {object} + */ + +module.exports = Mongos; diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/read_preference.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/read_preference.js new file mode 100644 index 0000000..1f10dd9 --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/read_preference.js @@ -0,0 +1,118 @@ +"use strict"; + +var needSlaveOk = ['primaryPreferred', 'secondary', 'secondaryPreferred', 'nearest']; + +/** + * @fileOverview The **ReadPreference** class is a class that represents a MongoDB ReadPreference and is + * used to construct connections. + * + * @example + * var ReplSet = require('mongodb-core').ReplSet + * , ReadPreference = require('mongodb-core').ReadPreference + * , assert = require('assert'); + * + * var server = new ReplSet([{host: 'localhost', port: 30000}], {setName: 'rs'}); + * // Wait for the connection event + * server.on('connect', function(server) { + * var cursor = server.cursor('db.test' + * , {find: 'db.test', query: {}} + * , {readPreference: new ReadPreference('secondary')}); + * cursor.next(function(err, doc) { + * server.destroy(); + * }); + * }); + * + * // Start connecting + * server.connect(); + */ + +/** + * Creates a new Pool instance + * @class + * @param {string} preference A string describing the preference (primary|primaryPreferred|secondary|secondaryPreferred|nearest) + * @param {array} tags The tags object + * @param {object} [options] Additional read preference options + * @param {number} [options.maxStalenessSeconds] Max Secondary Read Stalleness in Seconds, Minimum value is 90 seconds. + * @property {string} preference The preference string (primary|primaryPreferred|secondary|secondaryPreferred|nearest) + * @property {array} tags The tags object + * @property {object} options Additional read preference options + * @property {number} maxStalenessSeconds MaxStalenessSeconds value for the read preference + * @return {ReadPreference} + */ +var ReadPreference = function(preference, tags, options) { + this.preference = preference; + this.tags = tags; + this.options = options; + + // Add the maxStalenessSeconds value to the read Preference + if(this.options && this.options.maxStalenessSeconds != null) { + this.options = options; + this.maxStalenessSeconds = this.options.maxStalenessSeconds >= 0 + ? this.options.maxStalenessSeconds : null; + } else if(tags && typeof tags == 'object') { + this.options = tags, tags = null; + } +} + +/** + * This needs slaveOk bit set + * @method + * @return {boolean} + */ +ReadPreference.prototype.slaveOk = function() { + return needSlaveOk.indexOf(this.preference) != -1; +} + +/** + * Are the two read preference equal + * @method + * @return {boolean} + */ +ReadPreference.prototype.equals = function(readPreference) { + return readPreference.preference == this.preference; +} + +/** + * Return JSON representation + * @method + * @return {Object} + */ +ReadPreference.prototype.toJSON = function() { + var readPreference = {mode: this.preference}; + if(Array.isArray(this.tags)) readPreference.tags = this.tags; + if(this.maxStalenessSeconds) readPreference.maxStalenessSeconds = this.maxStalenessSeconds; + return readPreference; +} + +/** + * Primary read preference + * @method + * @return {ReadPreference} + */ +ReadPreference.primary = new ReadPreference('primary'); +/** + * Primary Preferred read preference + * @method + * @return {ReadPreference} + */ +ReadPreference.primaryPreferred = new ReadPreference('primaryPreferred'); +/** + * Secondary read preference + * @method + * @return {ReadPreference} + */ +ReadPreference.secondary = new ReadPreference('secondary'); +/** + * Secondary Preferred read preference + * @method + * @return {ReadPreference} + */ +ReadPreference.secondaryPreferred = new ReadPreference('secondaryPreferred'); +/** + * Nearest read preference + * @method + * @return {ReadPreference} + */ +ReadPreference.nearest = new ReadPreference('nearest'); + +module.exports = ReadPreference; diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js new file mode 100644 index 0000000..0082da8 --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset.js @@ -0,0 +1,1403 @@ +"use strict" + +var inherits = require('util').inherits, + f = require('util').format, + EventEmitter = require('events').EventEmitter, + ReadPreference = require('./read_preference'), + BasicCursor = require('../cursor'), + retrieveBSON = require('../connection/utils').retrieveBSON, + Logger = require('../connection/logger'), + MongoError = require('../error'), + Server = require('./server'), + ReplSetState = require('./replset_state'), + assign = require('./shared').assign, + clone = require('./shared').clone, + createClientInfo = require('./shared').createClientInfo; + +var MongoCR = require('../auth/mongocr') + , X509 = require('../auth/x509') + , Plain = require('../auth/plain') + , GSSAPI = require('../auth/gssapi') + , SSPI = require('../auth/sspi') + , ScramSHA1 = require('../auth/scram'); + +var BSON = retrieveBSON(); + +// +// States +var DISCONNECTED = 'disconnected'; +var CONNECTING = 'connecting'; +var CONNECTED = 'connected'; +var DESTROYED = 'destroyed'; + +function stateTransition(self, newState) { + var legalTransitions = { + 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED], + 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED], + 'connected': [CONNECTED, DISCONNECTED, DESTROYED], + 'destroyed': [DESTROYED] + } + + // Get current state + var legalStates = legalTransitions[self.state]; + if(legalStates && legalStates.indexOf(newState) != -1) { + self.state = newState; + } else { + self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]' + , self.id, self.state, newState, legalStates)); + } +} + +// +// ReplSet instance id +var id = 1; +var handlers = ['connect', 'close', 'error', 'timeout', 'parseError']; + +/** + * Creates a new Replset instance + * @class + * @param {array} seedlist A list of seeds for the replicaset + * @param {boolean} options.setName The Replicaset set name + * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset + * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry + * @param {boolean} [options.emitError=false] Server will emit errors events + * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors + * @param {number} [options.size=5] Server connection pool size + * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled + * @param {boolean} [options.noDelay=true] TCP Connection no delay + * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting + * @param {number} [options.socketTimeout=0] TCP Socket timeout setting + * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed + * @param {boolean} [options.ssl=false] Use SSL for connection + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {Buffer} [options.ca] SSL Certificate store binary buffer + * @param {Buffer} [options.cert] SSL Certificate binary buffer + * @param {Buffer} [options.key] SSL Key file binary buffer + * @param {string} [options.passphrase] SSL Certificate pass phrase + * @param {string} [options.servername=null] String containing the server name requested via TLS SNI. + * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates + * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits + * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. + * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. + * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers + * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @return {ReplSet} A cursor instance + * @fires ReplSet#connect + * @fires ReplSet#ha + * @fires ReplSet#joined + * @fires ReplSet#left + * @fires ReplSet#failed + * @fires ReplSet#fullsetup + * @fires ReplSet#all + * @fires ReplSet#error + * @fires ReplSet#serverHeartbeatStarted + * @fires ReplSet#serverHeartbeatSucceeded + * @fires ReplSet#serverHeartbeatFailed + * @fires ReplSet#topologyOpening + * @fires ReplSet#topologyClosed + * @fires ReplSet#topologyDescriptionChanged + * @property {string} type the topology type. + * @property {string} parserType the parser type used (c++ or js). + */ +var ReplSet = function(seedlist, options) { + var self = this; + options = options || {}; + + // Validate seedlist + if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array"); + // Validate list + if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry"); + // Validate entries + seedlist.forEach(function(e) { + if(typeof e.host != 'string' || typeof e.port != 'number') + throw new MongoError("seedlist entry must contain a host and port"); + }); + + // Add event listener + EventEmitter.call(this); + + // Get replSet Id + this.id = id++; + + // Get the localThresholdMS + var localThresholdMS = options.localThresholdMS || 15; + // Backward compatibility + if(options.acceptableLatency) localThresholdMS = options.acceptableLatency; + + // Create a logger + var logger = Logger('ReplSet', options); + + // Internal state + this.s = { + options: assign({}, options), + // BSON instance + bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128, + BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey, + BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]), + // Factory overrides + Cursor: options.cursorFactory || BasicCursor, + // Logger instance + logger: logger, + // Seedlist + seedlist: seedlist, + // Replicaset state + replicaSetState: new ReplSetState({ + id: this.id, setName: options.setName, + acceptableLatency: localThresholdMS, + heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000, + logger: logger + }), + // Current servers we are connecting to + connectingServers: [], + // Ha interval + haInterval: options.haInterval ? options.haInterval : 10000, + // Minimum heartbeat frequency used if we detect a server close + minHeartbeatFrequencyMS: 500, + // Disconnect handler + disconnectHandler: options.disconnectHandler, + // Server selection index + index: 0, + // Connect function options passed in + connectOptions: {}, + // Are we running in debug mode + debug: typeof options.debug == 'boolean' ? options.debug : false, + // Client info + clientInfo: createClientInfo(options) + } + + // Add handler for topology change + this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); }); + + // Log info warning if the socketTimeout < haInterval as it will cause + // a lot of recycled connections to happen. + if(this.s.logger.isWarn() + && this.s.options.socketTimeout != 0 + && this.s.options.socketTimeout < this.s.haInterval) { + this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts' + , this.s.options.socketTimeout, this.s.haInterval)); + } + + // All the authProviders + this.authProviders = options.authProviders || { + 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson) + , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson) + , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson) + } + + // Add forwarding of events from state handler + var types = ['joined', 'left']; + types.forEach(function(x) { + self.s.replicaSetState.on(x, function(t, s) { + if(self.state === CONNECTED && x === 'joined' && t == 'primary') { + self.emit('reconnect', self); + } + + self.emit(x, t, s); + }); + }); + + // Connect stat + this.initialConnectState = { + connect: false, fullsetup: false, all: false + } + + // Disconnected state + this.state = DISCONNECTED; + this.haTimeoutId = null; + // Are we authenticating + this.authenticating = false; + // Last ismaster + this.ismaster = null; +} + +inherits(ReplSet, EventEmitter); + +Object.defineProperty(ReplSet.prototype, 'type', { + enumerable:true, get: function() { return 'replset'; } +}); + +Object.defineProperty(ReplSet.prototype, 'parserType', { + enumerable:true, get: function() { + return BSON.native ? "c++" : "js"; + } +}); + +function attemptReconnect(self) { + if(self.runningAttempReconnect) return; + // Set as running + self.runningAttempReconnect = true; + // Wait before execute + self.haTimeoutId = setTimeout(function() { + if(self.state == DESTROYED) return; + + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id)); + } + + // Get all known hosts + var keys = Object.keys(self.s.replicaSetState.set); + var servers = keys.map(function(x) { + return new Server(assign({}, self.s.options, { + host: x.split(':')[0], port: parseInt(x.split(':')[1], 10) + }, { + authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true + }, { + clientInfo: clone(self.s.clientInfo) + })); + }); + + // Create the list of servers + self.s.connectingServers = servers.slice(0); + + // Handle all events coming from servers + function _handleEvent(self, event) { + return function() { + // Destroy the instance + if(self.state == DESTROYED) { + return this.destroy(); + } + + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event)); + } + + // Check if we are done + function done() { + // Done with the reconnection attempt + if(self.s.connectingServers.length == 0) { + if(self.state == DESTROYED) return; + + // If we have a primary and a disconnect handler, execute + // buffered operations + if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute(); + } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute({ executePrimary:true }); + } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute({ executeSecondary:true }); + } + + // Do we have a primary + if(self.s.replicaSetState.hasPrimary()) { + // Emit reconnect as new primary was discovered + self.emit('reconnect', self); + + // Connect any missing servers + connectNewServers(self, self.s.replicaSetState.unknownServers, function() { + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id)); + } + + // Reset the running + self.runningAttempReconnect = false; + + // Go back to normal topology monitoring + // Schedule a topology monitoring sweep + setTimeout(function() { + topologyMonitor(self); + }, self.s.haInterval); + }); + } else { + if(self.listeners("close").length > 0) { + self.emit('close', self); + } + + // Reset the running + self.runningAttempReconnect = false; + // Attempt a new reconnect + attemptReconnect(self); + } + } + } + + // Remove the server from our list + for(var i = 0; i < self.s.connectingServers.length; i++) { + if(self.s.connectingServers[i].equals(this)) { + self.s.connectingServers.splice(i, 1); + } + } + + // Keep reference to server + var _self = this; + + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id)); + } + + // Connect and not authenticating + if(event == 'connect' && !self.authenticating) { + if(self.state == DESTROYED) { + return _self.destroy(); + } + + // Update the replicaset state + if(self.s.replicaSetState.update(_self)) { + // Primary lastIsMaster store it + if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) { + self.ismaster = _self.lastIsMaster(); + } + + // Remove the handlers + for(i = 0; i < handlers.length; i++) { + _self.removeAllListeners(handlers[i]); + } + + // Add stable state handlers + _self.on('error', handleEvent(self, 'error')); + _self.on('close', handleEvent(self, 'close')); + _self.on('timeout', handleEvent(self, 'timeout')); + _self.on('parseError', handleEvent(self, 'parseError')); + } else { + _self.destroy(); + } + } else if(event == 'connect' && self.authenticating) { + this.destroy(); + } + + done(); + } + } + + // Index used to interleaf the server connects, avoiding + // runtime issues on io constrained vm's + var timeoutInterval = 0; + + function connect(server, timeoutInterval) { + setTimeout(function() { + server.once('connect', _handleEvent(self, 'connect')); + server.once('close', _handleEvent(self, 'close')); + server.once('timeout', _handleEvent(self, 'timeout')); + server.once('error', _handleEvent(self, 'error')); + server.once('parseError', _handleEvent(self, 'parseError')); + + // SDAM Monitoring events + server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); + server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); + server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); + + server.connect(self.s.connectOptions); + }, timeoutInterval); + } + + // Connect all servers + while(servers.length > 0) { + connect(servers.shift(), timeoutInterval++); + } + }, self.s.minHeartbeatFrequencyMS); +} + +function connectNewServers(self, servers, callback) { + // Count lefts + var count = servers.length; + + // Handle events + var _handleEvent = function(self, event) { + return function() { + var _self = this; + count = count - 1; + + // Destroyed + if(self.state == DESTROYED) { + return this.destroy(); + } + + if(event == 'connect' && !self.authenticating) { + // Destroyed + if(self.state == DESTROYED) { + return _self.destroy(); + } + + var result = self.s.replicaSetState.update(_self); + // Update the state with the new server + if(result) { + // Primary lastIsMaster store it + if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) { + self.ismaster = _self.lastIsMaster(); + } + + // Remove the handlers + for(var i = 0; i < handlers.length; i++) { + _self.removeAllListeners(handlers[i]); + } + + // Add stable state handlers + _self.on('error', handleEvent(self, 'error')); + _self.on('close', handleEvent(self, 'close')); + _self.on('timeout', handleEvent(self, 'timeout')); + _self.on('parseError', handleEvent(self, 'parseError')); + } else { + _self.destroy(); + } + } else if(event == 'connect' && self.authenticating) { + this.destroy(); + } + + // Are we done finish up callback + if(count == 0) { callback(); } + } + } + + // No new servers + if(count == 0) return callback(); + + // Execute method + function execute(_server, i) { + setTimeout(function() { + // Destroyed + if(self.state == DESTROYED) { + return; + } + + // Create a new server instance + var server = new Server(assign({}, self.s.options, { + host: _server.split(':')[0], + port: parseInt(_server.split(':')[1], 10) + }, { + authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true + }, { + clientInfo: clone(self.s.clientInfo) + })); + + // Add temp handlers + server.once('connect', _handleEvent(self, 'connect')); + server.once('close', _handleEvent(self, 'close')); + server.once('timeout', _handleEvent(self, 'timeout')); + server.once('error', _handleEvent(self, 'error')); + server.once('parseError', _handleEvent(self, 'parseError')); + + // SDAM Monitoring events + server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); + server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); + server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); + server.connect(self.s.connectOptions); + }, i); + } + + // Create new instances + for(var i = 0; i < servers.length; i++) { + execute(servers[i], i); + } +} + +function topologyMonitor(self, options) { + options = options || {}; + + // Set momitoring timeout + self.haTimeoutId = setTimeout(function() { + if(self.state == DESTROYED) return; + + // Is this a on connect topology discovery + // Schedule a proper topology monitoring to happen + // To ensure any discovered servers do not timeout + // while waiting for the initial discovery to happen. + if(options.haInterval) { + topologyMonitor(self); + } + + // If we have a primary and a disconnect handler, execute + // buffered operations + if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute(); + } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute({ executePrimary:true }); + } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) { + self.s.disconnectHandler.execute({ executeSecondary:true }); + } + + // Get the connectingServers + var connectingServers = self.s.replicaSetState.allServers(); + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]' + , self.id + , connectingServers.map(function(x) { + return x.name; + }))); + } + // Get the count + var count = connectingServers.length; + + // If we have no servers connected + if(count == 0 && !options.haInterval) { + if(self.listeners("close").length > 0) { + self.emit('close', self); + } + + return attemptReconnect(self); + } + + // If the count is zero schedule a new fast + function pingServer(_self, _server, cb) { + // Measure running time + var start = new Date().getTime(); + + // Emit the server heartbeat start + emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name }); + // Execute ismaster + // Set the socketTimeout for a monitoring message to a low number + // Ensuring ismaster calls are timed out quickly + _server.command('admin.$cmd', { + ismaster:true + }, { + monitoring: true, + socketTimeout: self.s.options.connectionTimeout || 2000, + }, function(err, r) { + if(self.state == DESTROYED) { + _server.destroy(); + return cb(err, r); + } + + // Calculate latency + var latencyMS = new Date().getTime() - start; + + // Set the last updatedTime + var hrTime = process.hrtime(); + // Calculate the last update time + _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000); + + // We had an error, remove it from the state + if(err) { + // Emit the server heartbeat failure + emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name }); + // Remove server from the state + _self.s.replicaSetState.remove(_server); + } else { + // Update the server ismaster + _server.ismaster = r.result; + + // Check if we have a lastWriteDate convert it to MS + // and store on the server instance for later use + if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) { + _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime(); + } + + // Do we have a brand new server + if(_server.lastIsMasterMS == -1) { + _server.lastIsMasterMS = latencyMS; + } else if(_server.lastIsMasterMS) { + // After the first measurement, average RTT MUST be computed using an + // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2. + // If the prior average is denoted old_rtt, then the new average (new_rtt) is + // computed from a new RTT measurement (x) using the following formula: + // alpha = 0.2 + // new_rtt = alpha * x + (1 - alpha) * old_rtt + _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS; + } + + if(_self.s.replicaSetState.update(_server)) { + // Primary lastIsMaster store it + if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) { + self.ismaster = _server.lastIsMaster(); + } + } + + // Server heart beat event + emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name }); + } + + // Calculate the stalness for this server + self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval); + + // Callback + cb(err, r); + }); + } + + // Connect any missing servers + function connectMissingServers() { + if(self.state == DESTROYED) return; + + // Attempt to connect to any unknown servers + connectNewServers(self, self.s.replicaSetState.unknownServers, function() { + if(self.state == DESTROYED) return; + + // Check if we have an options.haInterval (meaning it was triggered from connect) + if(options.haInterval) { + // Do we have a primary and secondary + if(self.state == CONNECTING + && self.s.replicaSetState.hasPrimaryAndSecondary()) { + // Transition to connected + stateTransition(self, CONNECTED); + // Update initial state + self.initialConnectState.connect = true; + self.initialConnectState.fullsetup = true; + self.initialConnectState.all = true; + // Emit fullsetup and all events + process.nextTick(function() { + self.emit('connect', self); + self.emit('fullsetup', self); + self.emit('all', self); + }); + } else if(self.state == CONNECTING + && self.s.replicaSetState.hasPrimary()) { + // Transition to connected + stateTransition(self, CONNECTED); + // Update initial state + self.initialConnectState.connect = true; + // Emit connected sign + process.nextTick(function() { + self.emit('connect', self); + }); + } else if(self.state == CONNECTING + && self.s.replicaSetState.hasSecondary() + && self.s.options.secondaryOnlyConnectionAllowed) { + // Transition to connected + stateTransition(self, CONNECTED); + // Update initial state + self.initialConnectState.connect = true; + // Emit connected sign + process.nextTick(function() { + self.emit('connect', self); + }); + } else if(self.state == CONNECTING) { + self.emit('error', new MongoError('no primary found in replicaset')); + // Destroy the topology + return self.destroy(); + } else if(self.state == CONNECTED + && self.s.replicaSetState.hasPrimaryAndSecondary() + && !self.initialConnectState.fullsetup) { + self.initialConnectState.fullsetup = true; + // Emit fullsetup and all events + process.nextTick(function() { + self.emit('fullsetup', self); + self.emit('all', self); + }); + } + } + + if(!options.haInterval) topologyMonitor(self); + }); + } + + // No connectingServers but unknown servers + if(connectingServers.length == 0 + && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) { + return connectMissingServers(); + } else if(connectingServers.length == 0 && options.haInterval) { + self.destroy(); + return self.emit('error', new MongoError('no valid replicaset members found')); + } + + // Ping all servers + for(var i = 0; i < connectingServers.length; i++) { + pingServer(self, connectingServers[i], function() { + count = count - 1; + + if(count == 0) { + connectMissingServers(); + } + }); + } + }, options.haInterval || self.s.haInterval) +} + +function handleEvent(self, event) { + return function() { + if(self.state == DESTROYED) return; + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id)); + } + + self.s.replicaSetState.remove(this); + } +} + +function handleInitialConnectEvent(self, event) { + return function() { + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id)); + } + + // Destroy the instance + if(self.state == DESTROYED) { + return this.destroy(); + } + + // Check the type of server + if(event == 'connect') { + // Update the state + var result = self.s.replicaSetState.update(this); + if(result == true) { + // Primary lastIsMaster store it + if(this.lastIsMaster() && this.lastIsMaster().ismaster) { + self.ismaster = this.lastIsMaster(); + } + + // Debug log + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set))); + } + + // Remove the handlers + for(var i = 0; i < handlers.length; i++) { + this.removeAllListeners(handlers[i]); + } + + // Add stable state handlers + this.on('error', handleEvent(self, 'error')); + this.on('close', handleEvent(self, 'close')); + this.on('timeout', handleEvent(self, 'timeout')); + this.on('parseError', handleEvent(self, 'parseError')); + } else if(result instanceof MongoError) { + this.destroy(); + self.destroy(); + return self.emit('error', result); + } else { + this.destroy(); + } + } else { + // Emit failure to connect + self.emit('failed', this); + // Remove from the state + self.s.replicaSetState.remove(this); + } + + // Remove from the list from connectingServers + for(i = 0; i < self.s.connectingServers.length; i++) { + if(self.s.connectingServers[i].equals(this)) { + self.s.connectingServers.splice(i, 1); + } + } + + // Trigger topologyMonitor + if(self.s.connectingServers.length == 0) { + topologyMonitor(self, {haInterval: 1}); + } + }; +} + +function connectServers(self, servers) { + // Update connectingServers + self.s.connectingServers = self.s.connectingServers.concat(servers); + + // Index used to interleaf the server connects, avoiding + // runtime issues on io constrained vm's + var timeoutInterval = 0; + + function connect(server, timeoutInterval) { + setTimeout(function() { + // Add the server to the state + if(self.s.replicaSetState.update(server)) { + // Primary lastIsMaster store it + if(server.lastIsMaster() && server.lastIsMaster().ismaster) { + self.ismaster = server.lastIsMaster(); + } + } + + // Add event handlers + server.once('close', handleInitialConnectEvent(self, 'close')); + server.once('timeout', handleInitialConnectEvent(self, 'timeout')); + server.once('parseError', handleInitialConnectEvent(self, 'parseError')); + server.once('error', handleInitialConnectEvent(self, 'error')); + server.once('connect', handleInitialConnectEvent(self, 'connect')); + // SDAM Monitoring events + server.on('serverOpening', function(e) { self.emit('serverOpening', e); }); + server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); }); + server.on('serverClosed', function(e) { self.emit('serverClosed', e); }); + // Start connection + server.connect(self.s.connectOptions); + }, timeoutInterval); + } + + // Start all the servers + while(servers.length > 0) { + connect(servers.shift(), timeoutInterval++); + } +} + +/** + * Emit event if it exists + * @method + */ +function emitSDAMEvent(self, event, description) { + if(self.listeners(event).length > 0) { + self.emit(event, description); + } +} + +/** + * Initiate server connect + * @method + * @param {array} [options.auth=null] Array of auth options to apply on connect + */ +ReplSet.prototype.connect = function(options) { + var self = this; + // Add any connect level options to the internal state + this.s.connectOptions = options || {}; + // Set connecting state + stateTransition(this, CONNECTING); + // Create server instances + var servers = this.s.seedlist.map(function(x) { + return new Server(assign({}, self.s.options, x, { + authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true + }, { + clientInfo: clone(self.s.clientInfo) + })); + }); + + // Error out as high availbility interval must be < than socketTimeout + if(this.s.options.socketTimeout > 0 && this.s.options.socketTimeout <= this.s.options.haInterval) { + return self.emit('error', new MongoError(f("haInterval [%s] MS must be set to less than socketTimeout [%s] MS" + , this.s.options.haInterval, this.s.options.socketTimeout))); + } + + // Emit the topology opening event + emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id }); + // Start all server connections + connectServers(self, servers); +} + +/** + * Destroy the server connection + * @param {boolean} [options.force=false] Force destroy the pool + * @method + */ +ReplSet.prototype.destroy = function(options) { + options = options || {}; + // Transition state + stateTransition(this, DESTROYED); + // Clear out any monitoring process + if(this.haTimeoutId) clearTimeout(this.haTimeoutId); + // Destroy the replicaset + this.s.replicaSetState.destroy(options); + + // Destroy all connecting servers + this.s.connectingServers.forEach(function(x) { + x.destroy(options); + }); + + // Emit toplogy closing event + emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id }); +} + +/** + * Unref all connections belong to this server + * @method + */ +ReplSet.prototype.unref = function() { + // Transition state + stateTransition(this, DISCONNECTED); + + this.s.replicaSetState.allServers().forEach(function(x) { + x.unref(); + }); + + clearTimeout(this.haTimeoutId); +} + +/** + * Returns the last known ismaster document for this server + * @method + * @return {object} + */ +ReplSet.prototype.lastIsMaster = function() { + return this.s.replicaSetState.primary + ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster; +} + +/** + * All raw connections + * @method + * @return {Connection[]} + */ +ReplSet.prototype.connections = function() { + var servers = this.s.replicaSetState.allServers(); + var connections = []; + for(var i = 0; i < servers.length; i++) { + connections = connections.concat(servers[i].connections()); + } + + return connections; +} + +/** + * Figure out if the server is connected + * @method + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @return {boolean} + */ +ReplSet.prototype.isConnected = function(options) { + options = options || {}; + + // If we are authenticating signal not connected + // To avoid interleaving of operations + if(this.authenticating) return false; + + // If we specified a read preference check if we are connected to something + // than can satisfy this + if(options.readPreference + && options.readPreference.equals(ReadPreference.secondary)) { + return this.s.replicaSetState.hasSecondary(); + } + + if(options.readPreference + && options.readPreference.equals(ReadPreference.primary)) { + return this.s.replicaSetState.hasPrimary(); + } + + if(options.readPreference + && options.readPreference.equals(ReadPreference.primaryPreferred)) { + return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary(); + } + + if(options.readPreference + && options.readPreference.equals(ReadPreference.secondaryPreferred)) { + return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary(); + } + + if(this.s.secondaryOnlyConnectionAllowed + && this.s.replicaSetState.hasSecondary()) { + return true; + } + + return this.s.replicaSetState.hasPrimary(); +} + +/** + * Figure out if the replicaset instance was destroyed by calling destroy + * @method + * @return {boolean} + */ +ReplSet.prototype.isDestroyed = function() { + return this.state == DESTROYED; +} + +/** + * Get server + * @method + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @return {Server} + */ +ReplSet.prototype.getServer = function(options) { + // Ensure we have no options + options = options || {}; + + // Pick the right server baspickServerd on readPreference + var server = this.s.replicaSetState.pickServer(options.readPreference); + if(this.s.debug) this.emit('pickedServer', options.readPreference, server); + return server; +} + +/** + * Get all connected servers + * @method + * @return {Server[]} + */ +ReplSet.prototype.getServers = function() { + return this.s.replicaSetState.allServers(); +} + +// +// Execute write operation +var executeWriteOperation = function(self, op, ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + // Ensure we have no options + options = options || {}; + + // No server returned we had an error + if(self.s.replicaSetState.primary == null) { + return callback(new MongoError("no primary server found")); + } + + // Execute the command + self.s.replicaSetState.primary[op](ns, ops, options, callback); +} + +/** + * Insert one or more documents + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of documents to insert + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +ReplSet.prototype.insert = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('insert', ns, ops, options, callback); + } + + // Execute write operation + executeWriteOperation(this, 'insert', ns, ops, options, callback); +} + +/** + * Perform one or more update operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of updates + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +ReplSet.prototype.update = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('update', ns, ops, options, callback); + } + + // Execute write operation + executeWriteOperation(this, 'update', ns, ops, options, callback); +} + +/** + * Perform one or more remove operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of removes + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +ReplSet.prototype.remove = function(ns, ops, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + + // Not connected but we have a disconnecthandler + if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('remove', ns, ops, options, callback); + } + + // Execute write operation + executeWriteOperation(this, 'remove', ns, ops, options, callback); +} + +/** + * Execute a command + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {object} cmd The command hash + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Connection} [options.connection] Specify connection object to execute command against + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +ReplSet.prototype.command = function(ns, cmd, options, callback) { + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed'))); + var self = this; + + // Establish readPreference + var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary; + + // If the readPreference is primary and we have no primary, store it + if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('command', ns, cmd, options, callback); + } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('command', ns, cmd, options, callback); + } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) { + return this.s.disconnectHandler.add('command', ns, cmd, options, callback); + } + + // Pick a server + var server = this.s.replicaSetState.pickServer(readPreference); + // We received an error, return it + if(!(server instanceof Server)) return callback(server); + // Emit debug event + if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server); + + // No server returned we had an error + if(server == null) { + return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference))); + } + + // Execute the command + server.command(ns, cmd, options, callback); +} + +/** + * Authenticate using a specified mechanism + * @method + * @param {string} mechanism The Auth mechanism we are invoking + * @param {string} db The db we are invoking the mechanism against + * @param {...object} param Parameters for the specific mechanism + * @param {authResultCallback} callback A callback function + */ +ReplSet.prototype.auth = function(mechanism, db) { + var allArgs = Array.prototype.slice.call(arguments, 0).slice(0); + var self = this; + var args = Array.prototype.slice.call(arguments, 2); + var callback = args.pop(); + + // If we don't have the mechanism fail + if(this.authProviders[mechanism] == null && mechanism != 'default') { + return callback(new MongoError(f("auth provider %s does not exist", mechanism))); + } + + // Are we already authenticating, throw + if(this.authenticating) { + return callback(new MongoError('authentication or logout allready in process')); + } + + // Topology is not connected, save the call in the provided store to be + // Executed at some point when the handler deems it's reconnected + if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) { + return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback); + } + + // Set to authenticating + this.authenticating = true; + // All errors + var errors = []; + + // Get all the servers + var servers = this.s.replicaSetState.allServers(); + // No servers return + if(servers.length == 0) { + this.authenticating = false; + callback(null, true); + } + + // Authenticate + function auth(server) { + // Arguments without a callback + var argsWithoutCallback = [mechanism, db].concat(args.slice(0)); + // Create arguments + var finalArguments = argsWithoutCallback.concat([function(err) { + count = count - 1; + // Save all the errors + if(err) errors.push({name: server.name, err: err}); + // We are done + if(count == 0) { + // Auth is done + self.authenticating = false; + + // Return the auth error + if(errors.length) return callback(MongoError.create({ + message: 'authentication fail', errors: errors + }), false); + + // Successfully authenticated session + callback(null, self); + } + }]); + + if(!server.lastIsMaster().arbiterOnly) { + // Execute the auth only against non arbiter servers + server.auth.apply(server, finalArguments); + } else { + // If we are authenticating against an arbiter just ignore it + finalArguments.pop()(null); + } + } + + // Get total count + var count = servers.length; + // Authenticate against all servers + while(servers.length > 0) { + auth(servers.shift()); + } +} + +/** + * Logout from a database + * @method + * @param {string} db The db we are logging out from + * @param {authResultCallback} callback A callback function + */ +ReplSet.prototype.logout = function(dbName, callback) { + var self = this; + // Are we authenticating or logging out, throw + if(this.authenticating) { + throw new MongoError('authentication or logout allready in process'); + } + + // Ensure no new members are processed while logging out + this.authenticating = true; + + // Remove from all auth providers (avoid any reaplication of the auth details) + var providers = Object.keys(this.authProviders); + for(var i = 0; i < providers.length; i++) { + this.authProviders[providers[i]].logout(dbName); + } + + // Now logout all the servers + var servers = this.s.replicaSetState.allServers(); + var count = servers.length; + if(count == 0) return callback(); + var errors = []; + + function logoutServer(_server, cb) { + _server.logout(dbName, function(err) { + if(err) errors.push({name: _server.name, err: err}); + cb(); + }); + } + + // Execute logout on all server instances + for(i = 0; i < servers.length; i++) { + logoutServer(servers[i], function() { + count = count - 1; + + if(count == 0) { + // Do not block new operations + self.authenticating = false; + // If we have one or more errors + if(errors.length) return callback(MongoError.create({ + message: f('logout failed against db %s', dbName), errors: errors + }), false); + + // No errors + callback(); + } + }) + } +} + +/** + * Perform one or more remove operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId + * @param {object} [options.batchSize=0] Batchsize for the operation + * @param {array} [options.documents=[]] Initial documents list for cursor + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) { + cursorOptions = cursorOptions || {}; + var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor; + return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options); +} + +/** + * A replset connect event, used to verify that the connection is up and running + * + * @event ReplSet#connect + * @type {ReplSet} + */ + +/** + * A replset reconnect event, used to verify that the topology reconnected + * + * @event ReplSet#reconnect + * @type {ReplSet} + */ + +/** + * A replset fullsetup event, used to signal that all topology members have been contacted. + * + * @event ReplSet#fullsetup + * @type {ReplSet} + */ + +/** + * A replset all event, used to signal that all topology members have been contacted. + * + * @event ReplSet#all + * @type {ReplSet} + */ + +/** + * A replset failed event, used to signal that initial replset connection failed. + * + * @event ReplSet#failed + * @type {ReplSet} + */ + +/** + * A server member left the replicaset + * + * @event ReplSet#left + * @type {function} + * @param {string} type The type of member that left (primary|secondary|arbiter) + * @param {Server} server The server object that left + */ + +/** + * A server member joined the replicaset + * + * @event ReplSet#joined + * @type {function} + * @param {string} type The type of member that joined (primary|secondary|arbiter) + * @param {Server} server The server object that joined + */ + +/** + * A server opening SDAM monitoring event + * + * @event ReplSet#serverOpening + * @type {object} + */ + +/** + * A server closed SDAM monitoring event + * + * @event ReplSet#serverClosed + * @type {object} + */ + +/** + * A server description SDAM change monitoring event + * + * @event ReplSet#serverDescriptionChanged + * @type {object} + */ + +/** + * A topology open SDAM event + * + * @event ReplSet#topologyOpening + * @type {object} + */ + +/** + * A topology closed SDAM event + * + * @event ReplSet#topologyClosed + * @type {object} + */ + +/** + * A topology structure SDAM change event + * + * @event ReplSet#topologyDescriptionChanged + * @type {object} + */ + +/** + * A topology serverHeartbeatStarted SDAM event + * + * @event ReplSet#serverHeartbeatStarted + * @type {object} + */ + +/** + * A topology serverHeartbeatFailed SDAM event + * + * @event ReplSet#serverHeartbeatFailed + * @type {object} + */ + +/** + * A topology serverHeartbeatSucceeded SDAM change event + * + * @event ReplSet#serverHeartbeatSucceeded + * @type {object} + */ + +module.exports = ReplSet; diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset_state.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset_state.js new file mode 100644 index 0000000..4fb1d4d --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/replset_state.js @@ -0,0 +1,972 @@ +"use strict" + +var inherits = require('util').inherits, + f = require('util').format, + EventEmitter = require('events').EventEmitter, + Logger = require('../connection/logger'), + ReadPreference = require('./read_preference'), + MongoError = require('../error'); + +var TopologyType = { + 'Single': 'Single', 'ReplicaSetNoPrimary': 'ReplicaSetNoPrimary', + 'ReplicaSetWithPrimary': 'ReplicaSetWithPrimary', 'Sharded': 'Sharded', + 'Unknown': 'Unknown' +}; + +var ServerType = { + 'Standalone': 'Standalone', 'Mongos': 'Mongos', 'PossiblePrimary': 'PossiblePrimary', + 'RSPrimary': 'RSPrimary', 'RSSecondary': 'RSSecondary', 'RSArbiter': 'RSArbiter', + 'RSOther': 'RSOther', 'RSGhost': 'RSGhost', 'Unknown': 'Unknown' +}; + +var ReplSetState = function(options) { + options = options || {}; + // Add event listener + EventEmitter.call(this); + // Topology state + this.topologyType = TopologyType.ReplicaSetNoPrimary; + this.setName = options.setName; + + // Server set + this.set = {}; + + // Unpacked options + this.id = options.id; + this.setName = options.setName; + + // Replicaset logger + this.logger = options.logger || Logger('ReplSet', options); + + // Server selection index + this.index = 0; + // Acceptable latency + this.acceptableLatency = options.acceptableLatency || 15; + + // heartbeatFrequencyMS + this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 10000; + + // Server side + this.primary = null; + this.secondaries = []; + this.arbiters = []; + this.passives = []; + this.ghosts = []; + // Current unknown hosts + this.unknownServers = []; + // In set status + this.set = {}; + // Status + this.maxElectionId = null; + this.maxSetVersion = 0; + // Description of the Replicaset + this.replicasetDescription = { + "topologyType": "Unknown", "servers": [] + }; +} + +inherits(ReplSetState, EventEmitter); + +ReplSetState.prototype.hasPrimaryAndSecondary = function() { + return this.primary != null && this.secondaries.length > 0; +} + +ReplSetState.prototype.hasPrimary = function() { + return this.primary != null; +} + +ReplSetState.prototype.hasSecondary = function() { + return this.secondaries.length > 0; +} + +ReplSetState.prototype.allServers = function(options) { + options = options || {}; + var servers = this.primary ? [this.primary] : []; + servers = servers.concat(this.secondaries); + if(!options.ignoreArbiters) servers = servers.concat(this.arbiters); + servers = servers.concat(this.passives); + return servers; +} + +ReplSetState.prototype.destroy = function(options) { + // Destroy all sockets + if(this.primary) this.primary.destroy(options); + this.secondaries.forEach(function(x) { x.destroy(options); }); + this.arbiters.forEach(function(x) { x.destroy(options); }); + this.passives.forEach(function(x) { x.destroy(options); }); + this.ghosts.forEach(function(x) { x.destroy(options); }); + // Clear out the complete state + this.secondaries = []; + this.arbiters = []; + this.passives = []; + this.ghosts = []; + this.unknownServers = []; + this.set = {}; +} + +ReplSetState.prototype.remove = function(server, options) { + options = options || {}; + + // Only remove if the current server is not connected + var servers = this.primary ? [this.primary] : []; + servers = servers.concat(this.secondaries); + servers = servers.concat(this.arbiters); + servers = servers.concat(this.passives); + + // Check if it's active and this is just a failed connection attempt + for(var i = 0; i < servers.length; i++) { + if(!options.force && servers[i].equals(server) && servers[i].isConnected && servers[i].isConnected()) { + return; + } + } + + // If we have it in the set remove it + if(this.set[server.name.toLowerCase()]) { + this.set[server.name.toLowerCase()].type = ServerType.Unknown; + this.set[server.name.toLowerCase()].electionId = null; + this.set[server.name.toLowerCase()].setName = null; + this.set[server.name.toLowerCase()].setVersion = null; + } + + // Remove type + var removeType = null; + + // Remove from any lists + if(this.primary && this.primary.equals(server)) { + this.primary = null; + this.topologyType = TopologyType.ReplicaSetNoPrimary; + removeType = 'primary'; + } + + // Remove from any other server lists + removeType = removeFrom(server, this.secondaries) ? 'secondary' : removeType; + removeType = removeFrom(server, this.arbiters) ? 'arbiter' : removeType; + removeType = removeFrom(server, this.passives) ? 'secondary' : removeType; + removeFrom(server, this.ghosts); + removeFrom(server, this.unknownServers); + + // Do we have a removeType + if(removeType) { + this.emit('left', removeType, server); + } +} + +ReplSetState.prototype.update = function(server) { + var self = this; + // Get the current ismaster + var ismaster = server.lastIsMaster(); + + // + // Add any hosts + // + if(ismaster) { + // Join all the possible new hosts + var hosts = Array.isArray(ismaster.hosts) ? ismaster.hosts : []; + hosts = hosts.concat(Array.isArray(ismaster.arbiters) ? ismaster.arbiters : []); + hosts = hosts.concat(Array.isArray(ismaster.passives) ? ismaster.passives : []); + + // Add all hosts as unknownServers + for(var i = 0; i < hosts.length; i++) { + // Add to the list of unknown server + if(this.unknownServers.indexOf(hosts[i]) == -1 + && (!this.set[hosts[i].toLowerCase()] || this.set[hosts[i].toLowerCase()].type == ServerType.Unknown)) { + this.unknownServers.push(hosts[i]); + } + + if(!this.set[hosts[i].toLowerCase()]) { + this.set[hosts[i].toLowerCase()] = { + type: ServerType.Unknown, + electionId: null, + setName: null, + setVersion: null + } + } + } + } + + // + // Unknown server + // + if(!ismaster && !inList(ismaster, server, this.unknownServers)) { + self.set[server.name.toLowerCase()] = { + type: ServerType.Unknown, setVersion: null, electionId: null, setName: null + } + // Update set information about the server instance + self.set[server.name.toLowerCase()].type = ServerType.Unknown; + self.set[server.name.toLowerCase()].electionId = ismaster ? ismaster.electionId : ismaster; + self.set[server.name.toLowerCase()].setName = ismaster ? ismaster.setName : ismaster; + self.set[server.name.toLowerCase()].setVersion = ismaster ? ismaster.setVersion : ismaster; + + if(self.unknownServers.indexOf(server.name) == -1) { + self.unknownServers.push(server.name); + } + + // Set the topology + return false; + } + + // + // Is this a mongos + // + if(ismaster && ismaster.msg == 'isdbgrid') { + return false; + } + + // A RSOther instance + if((ismaster.setName && ismaster.hidden) + || (ismaster.setName && !ismaster.ismaster && !ismaster.secondary && !ismaster.arbiterOnly && !ismaster.passive)) { + self.set[server.name.toLowerCase()] = { + type: ServerType.RSOther, setVersion: null, + electionId: null, setName: ismaster.setName + } + // Set the topology + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + return false; + } + + // A RSGhost instance + if(ismaster.isreplicaset) { + self.set[server.name.toLowerCase()] = { + type: ServerType.RSGhost, setVersion: null, + electionId: null, setName: null + } + + // Set the topology + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + + // Set the topology + return false; + } + + // + // Standalone server, destroy and return + // + if(ismaster && ismaster.ismaster && !ismaster.setName) { + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.Unknown; + this.remove(server, {force:true}); + return false; + } + + // + // Server in maintanance mode + // + if(ismaster && !ismaster.ismaster && !ismaster.secondary && !ismaster.arbiterOnly) { + this.remove(server, {force:true}); + return false; + } + + // + // If the .me field does not match the passed in server + // + if(ismaster.me && ismaster.me != server.name) { + if(this.logger.isWarn()) { + this.logger.warn(f('the seedlist server was removed due to its address %s not matching its ismaster.me address %s', server.name, ismaster.me)); + } + + // Delete from the set + delete this.set[server.name.toLowerCase()]; + + // Set the type of topology we have + if(this.primary && !this.primary.equals(server)) { + this.topologyType = TopologyType.ReplicaSetWithPrimary; + } else { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + } + + // + // We have a potential primary + // + if(!this.primary && ismaster.primary) { + this.set[ismaster.primary.toLowerCase()] = { + type: ServerType.PossiblePrimary, + setName: null, + electionId: null, + setVersion: null, + } + } + + return false; + } + + // + // Primary handling + // + if(!this.primary && ismaster.ismaster && ismaster.setName) { + var ismasterElectionId = server.lastIsMaster().electionId; + if(this.setName && this.setName != ismaster.setName) { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + return new MongoError(f('setName from ismaster does not match provided connection setName [%s] != [%s]', ismaster.setName, this.setName)); + } + + if(!this.maxElectionId && ismasterElectionId) { + this.maxElectionId = ismasterElectionId; + } else if(this.maxElectionId && ismasterElectionId) { + var result = compareObjectIds(this.maxElectionId, ismasterElectionId); + // Get the electionIds + var ismasterSetVersion = server.lastIsMaster().setVersion; + + // if(result == 1 || result == 0) { + if(result == 1) { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + return false; + } else if(result == 0 && ismasterSetVersion) { + if(ismasterSetVersion < this.maxSetVersion) { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + return false; + } + } + + this.maxSetVersion = ismasterSetVersion; + this.maxElectionId = ismasterElectionId; + } + + // Hande normalization of server names + var normalizedHosts = ismaster.hosts.map(function(x) { return x.toLowerCase() }); + var locationIndex = normalizedHosts.indexOf(server.name.toLowerCase()); + + // Validate that the server exists in the host list + if(locationIndex != -1) { + self.primary = server; + self.set[server.name.toLowerCase()] = { + type: ServerType.RSPrimary, + setVersion: ismaster.setVersion, + electionId: ismaster.electionId, + setName: ismaster.setName + } + + // Set the topology + this.topologyType = TopologyType.ReplicaSetWithPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + removeFrom(server, self.unknownServers); + removeFrom(server, self.secondaries); + removeFrom(server, self.passives); + self.emit('joined', 'primary', server); + } else { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + } + + emitTopologyDescriptionChanged(self); + return true; + } else if(ismaster.ismaster && ismaster.setName) { + // Get the electionIds + var currentElectionId = self.set[self.primary.name.toLowerCase()].electionId; + var currentSetVersion = self.set[self.primary.name.toLowerCase()].setVersion; + var currentSetName = self.set[self.primary.name.toLowerCase()].setName; + ismasterElectionId = server.lastIsMaster().electionId; + ismasterSetVersion = server.lastIsMaster().setVersion; + var ismasterSetName = server.lastIsMaster().setName; + + // Is it the same server instance + if(this.primary.equals(server) + && currentSetName == ismasterSetName) { + return false; + } + + // If we do not have the same rs name + if(currentSetName && currentSetName != ismasterSetName) { + if(!this.primary.equals(server)) { + this.topologyType = TopologyType.ReplicaSetWithPrimary; + } else { + this.topologyType = TopologyType.ReplicaSetNoPrimary; + } + + return false; + } + + // Check if we need to replace the server + if(currentElectionId && ismasterElectionId) { + result = compareObjectIds(currentElectionId, ismasterElectionId); + + if(result == 1) { + return false; + } else if(result == 0 && (currentSetVersion > ismasterSetVersion)) { + return false; + } + } else if(!currentElectionId && ismasterElectionId + && ismasterSetVersion) { + if(ismasterSetVersion < this.maxSetVersion) { + return false; + } + } + + if(!this.maxElectionId && ismasterElectionId) { + this.maxElectionId = ismasterElectionId; + } else if(this.maxElectionId && ismasterElectionId) { + result = compareObjectIds(this.maxElectionId, ismasterElectionId); + + if(result == 1) { + return false; + } else if(result == 0 && currentSetVersion && ismasterSetVersion) { + if(ismasterSetVersion < this.maxSetVersion) { + return false; + } + } else { + if(ismasterSetVersion < this.maxSetVersion) { + return false; + } + } + + this.maxElectionId = ismasterElectionId; + this.maxSetVersion = ismasterSetVersion; + } else { + this.maxSetVersion = ismasterSetVersion; + } + + // Modify the entry to unknown + self.set[self.primary.name.toLowerCase()] = { + type: ServerType.Unknown, setVersion: null, + electionId: null, setName: null + } + + // Signal primary left + self.emit('left', 'primary', this.primary); + // Destroy the instance + self.primary.destroy(); + // Set the new instance + self.primary = server; + // Set the set information + self.set[server.name.toLowerCase()] = { + type: ServerType.RSPrimary, setVersion: ismaster.setVersion, + electionId: ismaster.electionId, setName: ismaster.setName + } + + // Set the topology + this.topologyType = TopologyType.ReplicaSetWithPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + removeFrom(server, self.unknownServers); + removeFrom(server, self.secondaries); + removeFrom(server, self.passives); + self.emit('joined', 'primary', server); + emitTopologyDescriptionChanged(self); + return true; + } + + // A possible instance + if(!this.primary && ismaster.primary) { + self.set[ismaster.primary.toLowerCase()] = { + type: ServerType.PossiblePrimary, setVersion: null, + electionId: null, setName: null + } + } + + // + // Secondary handling + // + if(ismaster.secondary && ismaster.setName + && !inList(ismaster, server, this.secondaries) + && this.setName && this.setName == ismaster.setName) { + addToList(self, ServerType.RSSecondary, ismaster, server, this.secondaries); + // Set the topology + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + removeFrom(server, self.unknownServers); + + // Remove primary + if(this.primary && this.primary.name == server.name) { + server.destroy(); + this.primary = null; + self.emit('left', 'primary', server); + } + + self.emit('joined', 'secondary', server); + emitTopologyDescriptionChanged(self); + return true; + } + + // + // Arbiter handling + // + if(ismaster.arbiterOnly && ismaster.setName + && !inList(ismaster, server, this.arbiters) + && this.setName && this.setName == ismaster.setName) { + addToList(self, ServerType.RSArbiter, ismaster, server, this.arbiters); + // Set the topology + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + removeFrom(server, self.unknownServers); + self.emit('joined', 'arbiter', server); + emitTopologyDescriptionChanged(self); + return true; + } + + // + // Passive handling + // + if(ismaster.passive && ismaster.setName + && !inList(ismaster, server, this.passives) + && this.setName && this.setName == ismaster.setName) { + addToList(self, ServerType.RSSecondary, ismaster, server, this.passives); + // Set the topology + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + if(ismaster.setName) this.setName = ismaster.setName; + removeFrom(server, self.unknownServers); + + // Remove primary + if(this.primary && this.primary.name == server.name) { + server.destroy(); + this.primary = null; + self.emit('left', 'primary', server); + } + + self.emit('joined', 'secondary', server); + emitTopologyDescriptionChanged(self); + return true; + } + + // + // Remove the primary + // + if(this.set[server.name.toLowerCase()] && this.set[server.name.toLowerCase()].type == ServerType.RSPrimary) { + self.emit('left', 'primary', this.primary); + this.primary.destroy(); + this.primary = null; + this.topologyType = TopologyType.ReplicaSetNoPrimary; + return false; + } + + this.topologyType = this.primary ? TopologyType.ReplicaSetWithPrimary : TopologyType.ReplicaSetNoPrimary; + return false; +} + +/** + * Recalculate single server max staleness + * @method + */ +ReplSetState.prototype.updateServerMaxStaleness = function(server, haInterval) { + // Locate the max secondary lastwrite + var max = 0; + // Go over all secondaries + for(var i = 0; i < this.secondaries.length; i++) { + max = Math.max(max, this.secondaries[i].lastWriteDate); + } + + // Perform this servers staleness calculation + if(server.ismaster.maxWireVersion >= 5 + && server.ismaster.secondary + && this.hasPrimary()) { + server.staleness = (server.lastUpdateTime - server.lastWriteDate) + - (this.primary.lastUpdateTime - this.primary.lastWriteDate) + + haInterval; + } else if(server.ismaster.maxWireVersion >= 5 + && server.ismaster.secondary){ + server.staleness = max - server.lastWriteDate + haInterval; + } +} + +/** + * Recalculate all the stalness values for secodaries + * @method + */ +ReplSetState.prototype.updateSecondariesMaxStaleness = function(haInterval) { + for(var i = 0; i < this.secondaries.length; i++) { + this.updateServerMaxStaleness(this.secondaries[i], haInterval); + } +} + +/** + * Pick a server by the passed in ReadPreference + * @method + * @param {ReadPreference} readPreference The ReadPreference instance to use + */ +ReplSetState.prototype.pickServer = function(readPreference) { + // If no read Preference set to primary by default + readPreference = readPreference || ReadPreference.primary; + + // maxStalenessSeconds is not allowed with a primary read + if(readPreference.preference == 'primary' && readPreference.maxStalenessSeconds != null) { + return new MongoError('primary readPreference incompatible with maxStalenessSeconds'); + } + + // Check if we have any non compatible servers for maxStalenessSeconds + var allservers = this.primary ? [this.primary] : []; + allservers = allservers.concat(this.secondaries); + + // Does any of the servers not support the right wire protocol version + // for maxStalenessSeconds when maxStalenessSeconds specified on readPreference. Then error out + if(readPreference.maxStalenessSeconds != null) { + for(var i = 0; i < allservers.length; i++) { + if(allservers[i].ismaster.maxWireVersion < 5) { + return new MongoError('maxStalenessSeconds not supported by at least one of the replicaset members'); + } + } + } + + // Do we have the nearest readPreference + if(readPreference.preference == 'nearest' && readPreference.maxStalenessSeconds == null) { + return pickNearest(this, readPreference); + } else if(readPreference.preference == 'nearest' && readPreference.maxStalenessSeconds != null) { + return pickNearestMaxStalenessSeconds(this, readPreference); + } + + // Get all the secondaries + var secondaries = this.secondaries; + + // Check if we can satisfy and of the basic read Preferences + if(readPreference.equals(ReadPreference.secondary) + && secondaries.length == 0) { + return new MongoError("no secondary server available"); + } + + if(readPreference.equals(ReadPreference.secondaryPreferred) + && secondaries.length == 0 + && this.primary == null) { + return new MongoError("no secondary or primary server available"); + } + + if(readPreference.equals(ReadPreference.primary) + && this.primary == null) { + return new MongoError("no primary server available"); + } + + // Secondary preferred or just secondaries + if(readPreference.equals(ReadPreference.secondaryPreferred) + || readPreference.equals(ReadPreference.secondary)) { + + if(secondaries.length > 0 && readPreference.maxStalenessSeconds == null) { + // Pick nearest of any other servers available + var server = pickNearest(this, readPreference); + // No server in the window return primary + if(server) { + return server; + } + } else if(secondaries.length > 0 && readPreference.maxStalenessSeconds != null) { + // Pick nearest of any other servers available + server = pickNearestMaxStalenessSeconds(this, readPreference); + // No server in the window return primary + if(server) { + return server; + } + } + + if(readPreference.equals(ReadPreference.secondaryPreferred)){ + return this.primary; + } + + return null; + } + + // Primary preferred + if(readPreference.equals(ReadPreference.primaryPreferred)) { + server = null; + + // We prefer the primary if it's available + if(this.primary) { + return this.primary; + } + + // Pick a secondary + if(secondaries.length > 0 && readPreference.maxStalenessSeconds == null) { + server = pickNearest(this, readPreference); + } else if(secondaries.length > 0 && readPreference.maxStalenessSeconds != null) { + server = pickNearestMaxStalenessSeconds(this, readPreference); + } + + // Did we find a server + if(server) return server; + } + + // Return the primary + return this.primary; +} + +// +// Filter serves by tags +var filterByTags = function(readPreference, servers) { + if(readPreference.tags == null) return servers; + var filteredServers = []; + var tagsArray = Array.isArray(readPreference.tags) ? readPreference.tags : [readPreference.tags]; + + // Iterate over the tags + for(var j = 0; j < tagsArray.length; j++) { + var tags = tagsArray[j]; + + // Iterate over all the servers + for(var i = 0; i < servers.length; i++) { + var serverTag = servers[i].lastIsMaster().tags || {}; + + // Did we find the a matching server + var found = true; + // Check if the server is valid + for(var name in tags) { + if(serverTag[name] != tags[name]) { + found = false; + } + } + + // Add to candidate list + if(found) { + filteredServers.push(servers[i]); + } + } + } + + // Returned filtered servers + return filteredServers; +} + +function pickNearestMaxStalenessSeconds(self, readPreference) { + // Only get primary and secondaries as seeds + var servers = []; + var heartbeatFrequencyMS = self.heartbeatFrequencyMS; + + // Get the maxStalenessMS + var maxStalenessMS = readPreference.maxStalenessSeconds * 1000; + + // Check if the maxStalenessMS > 90 seconds + if(maxStalenessMS < 90 * 1000) { + return new MongoError('maxStalenessSeconds must be set to at least 90 seconds'); + } + + // Add primary to list if not a secondary read preference + if(self.primary && readPreference.preference != 'secondary') { + servers.push(self.primary); + } + + // Add all the secondaries + for(var i = 0; i < self.secondaries.length; i++) { + servers.push(self.secondaries[i]); + } + + // Filter by tags + servers = filterByTags(readPreference, servers); + + // + // Locate lowest time (picked servers are lowest time + acceptable Latency margin) + // var lowest = servers.length > 0 ? servers[0].lastIsMasterMS : 0; + + // Filter by latency + servers = servers.filter(function(s) { + return s.staleness <= maxStalenessMS; + }); + + // Sort by time + servers.sort(function(a, b) { + // return a.time > b.time; + return a.lastIsMasterMS > b.lastIsMasterMS + }); + + // No servers, default to primary + if(servers.length == 0) { + return null + } + + // Ensure index does not overflow the number of available servers + self.index = self.index % servers.length; + + // Get the server + var server = servers[self.index]; + // Add to the index + self.index = self.index + 1; + // Return the first server of the sorted and filtered list + return server; +} + +function pickNearest(self, readPreference) { + // Only get primary and secondaries as seeds + var servers = []; + + // Add primary to list if not a secondary read preference + if(self.primary && readPreference.preference != 'secondary') { + servers.push(self.primary); + } + + // Add all the secondaries + for(var i = 0; i < self.secondaries.length; i++) { + servers.push(self.secondaries[i]); + } + + // Filter by tags + servers = filterByTags(readPreference, servers); + + // Sort by time + servers.sort(function(a, b) { + // return a.time > b.time; + return a.lastIsMasterMS > b.lastIsMasterMS + }); + + // Locate lowest time (picked servers are lowest time + acceptable Latency margin) + var lowest = servers.length > 0 ? servers[0].lastIsMasterMS : 0; + + // Filter by latency + servers = servers.filter(function(s) { + return s.lastIsMasterMS <= lowest + self.acceptableLatency; + }); + + // No servers, default to primary + if(servers.length == 0) { + return null + } + + // Ensure index does not overflow the number of available servers + self.index = self.index % servers.length; + // Get the server + var server = servers[self.index]; + // Add to the index + self.index = self.index + 1; + // Return the first server of the sorted and filtered list + return server; +} + +function inList(ismaster, server, list) { + for(var i = 0; i < list.length; i++) { + if(list[i].name == server.name) return true; + } + + return false; +} + +function addToList(self, type, ismaster, server, list) { + // Update set information about the server instance + self.set[server.name.toLowerCase()].type = type; + self.set[server.name.toLowerCase()].electionId = ismaster ? ismaster.electionId : ismaster; + self.set[server.name.toLowerCase()].setName = ismaster ? ismaster.setName : ismaster; + self.set[server.name.toLowerCase()].setVersion = ismaster ? ismaster.setVersion : ismaster; + // Add to the list + list.push(server); +} + +function compareObjectIds(id1, id2) { + var a = new Buffer(id1.toHexString(), 'hex'); + var b = new Buffer(id2.toHexString(), 'hex'); + + if(a === b) { + return 0; + } + + if(typeof Buffer.compare === 'function') { + return Buffer.compare(a, b); + } + + var x = a.length; + var y = b.length; + var len = Math.min(x, y); + + for (var i = 0; i < len; i++) { + if (a[i] !== b[i]) { + break; + } + } + + if (i !== len) { + x = a[i]; + y = b[i]; + } + + return x < y ? -1 : y < x ? 1 : 0; +} + +function removeFrom(server, list) { + for(var i = 0; i < list.length; i++) { + if(list[i].equals && list[i].equals(server)) { + list.splice(i, 1); + return true; + } else if(typeof list[i] == 'string' && list[i] == server.name) { + list.splice(i, 1); + return true; + } + } + + return false; +} + +function emitTopologyDescriptionChanged(self) { + if(self.listeners('topologyDescriptionChanged').length > 0) { + var topology = 'Unknown'; + var setName = self.setName; + + if(self.hasPrimaryAndSecondary()) { + topology = 'ReplicaSetWithPrimary'; + } else if(!self.hasPrimary() && self.hasSecondary()) { + topology = 'ReplicaSetNoPrimary'; + } + + // Generate description + var description = { + topologyType: topology, + setName: setName, + servers: [] + } + + // Add the primary to the list + if(self.hasPrimary()) { + var desc = self.primary.getDescription(); + desc.type = 'RSPrimary'; + description.servers.push(desc); + } + + // Add all the secondaries + description.servers = description.servers.concat(self.secondaries.map(function(x) { + var description = x.getDescription(); + description.type = 'RSSecondary'; + return description; + })); + + // Add all the arbiters + description.servers = description.servers.concat(self.arbiters.map(function(x) { + var description = x.getDescription(); + description.type = 'RSArbiter'; + return description; + })); + + // Add all the passives + description.servers = description.servers.concat(self.passives.map(function(x) { + var description = x.getDescription(); + description.type = 'RSSecondary'; + return description; + })); + + // Create the result + var result = { + topologyId: self.id, + previousDescription: self.replicasetDescription, + newDescription: description, + diff: diff(self.replicasetDescription, description) + }; + + // Emit the topologyDescription change + self.emit('topologyDescriptionChanged', result); + + // Set the new description + self.replicasetDescription = description; + } +} + +function diff(previous, current) { + // Difference document + var diff = { + servers: [] + } + + // Previous entry + if(!previous) { + previous = { servers: [] }; + } + + // Got through all the servers + for(var i = 0; i < previous.servers.length; i++) { + var prevServer = previous.servers[i]; + + // Go through all current servers + for(var j = 0; j < current.servers.length; j++) { + var currServer = current.servers[j]; + + // Matching server + if(prevServer.address === currServer.address) { + // We had a change in state + if(prevServer.type != currServer.type) { + diff.servers.push({ + address: prevServer.address, + from: prevServer.type, + to: currServer.type + }); + } + } + } + } + + // Return difference + return diff; +} + +module.exports = ReplSetState; diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js new file mode 100644 index 0000000..fac82b5 --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/server.js @@ -0,0 +1,855 @@ +"use strict" + +var inherits = require('util').inherits, + require_optional = require('require_optional'), + f = require('util').format, + EventEmitter = require('events').EventEmitter, + ReadPreference = require('./read_preference'), + Logger = require('../connection/logger'), + debugOptions = require('../connection/utils').debugOptions, + retrieveBSON = require('../connection/utils').retrieveBSON, + Pool = require('../connection/pool'), + Query = require('../connection/commands').Query, + MongoError = require('../error'), + PreTwoSixWireProtocolSupport = require('../wireprotocol/2_4_support'), + TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'), + ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'), + BasicCursor = require('../cursor'), + sdam = require('./shared'), + assign = require('./shared').assign, + createClientInfo = require('./shared').createClientInfo; + +// Used for filtering out fields for loggin +var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host' + , 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout', 'checkServerIdentity' + , 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert', 'key', 'rejectUnauthorized', 'promoteLongs', 'promoteValues' + , 'promoteBuffers', 'servername']; + +// Server instance id +var id = 0; +var serverAccounting = false; +var servers = {}; +var BSON = retrieveBSON(); + +/** + * Creates a new Server instance + * @class + * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection + * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times + * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries + * @param {number} [options.monitoring=true] Enable the server state monitoring (calling ismaster at monitoringInterval) + * @param {number} [options.monitoringInterval=5000] The interval of calling ismaster when monitoring is enabled. + * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors + * @param {string} options.host The server host + * @param {number} options.port The server port + * @param {number} [options.size=5] Server connection pool size + * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled + * @param {boolean} [options.noDelay=true] TCP Connection no delay + * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting + * @param {number} [options.socketTimeout=0] TCP Socket timeout setting + * @param {boolean} [options.ssl=false] Use SSL for connection + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {Buffer} [options.ca] SSL Certificate store binary buffer + * @param {Buffer} [options.cert] SSL Certificate binary buffer + * @param {Buffer} [options.key] SSL Key file binary buffer + * @param {string} [options.passphrase] SSL Certificate pass phrase + * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates + * @param {string} [options.servername=null] String containing the server name requested via TLS SNI. + * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits + * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. + * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. + * @param {string} [options.appname=null] Application name, passed in on ismaster call and logged in mongod server logs. Maximum size 128 bytes. + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @return {Server} A cursor instance + * @fires Server#connect + * @fires Server#close + * @fires Server#error + * @fires Server#timeout + * @fires Server#parseError + * @fires Server#reconnect + * @fires Server#reconnectFailed + * @fires Server#serverHeartbeatStarted + * @fires Server#serverHeartbeatSucceeded + * @fires Server#serverHeartbeatFailed + * @fires Server#topologyOpening + * @fires Server#topologyClosed + * @fires Server#topologyDescriptionChanged + * @property {string} type the topology type. + * @property {string} parserType the parser type used (c++ or js). + */ +var Server = function(options) { + options = options || {}; + + // Add event listener + EventEmitter.call(this); + + // Server instance id + this.id = id++; + + // Internal state + this.s = { + // Options + options: options, + // Logger + logger: Logger('Server', options), + // Factory overrides + Cursor: options.cursorFactory || BasicCursor, + // BSON instance + bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128, + BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey, + BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]), + // Pool + pool: null, + // Disconnect handler + disconnectHandler: options.disconnectHandler, + // Monitor thread (keeps the connection alive) + monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : true, + // Is the server in a topology + inTopology: typeof options.inTopology == 'boolean' ? options.inTopology : false, + // Monitoring timeout + monitoringInterval: typeof options.monitoringInterval == 'number' + ? options.monitoringInterval + : 5000, + // Topology id + topologyId: -1 + } + + // Curent ismaster + this.ismaster = null; + // Current ping time + this.lastIsMasterMS = -1; + // The monitoringProcessId + this.monitoringProcessId = null; + // Initial connection + this.initalConnect = true; + // Wire protocol handler, default to oldest known protocol handler + // this gets changed when the first ismaster is called. + this.wireProtocolHandler = new PreTwoSixWireProtocolSupport(); + // Default type + this._type = 'server'; + // Set the client info + this.clientInfo = createClientInfo(options); + + // Max Stalleness values + // last time we updated the ismaster state + this.lastUpdateTime = 0; + // Last write time + this.lastWriteDate = 0; + // Stalleness + this.staleness = 0; +} + +inherits(Server, EventEmitter); + +Object.defineProperty(Server.prototype, 'type', { + enumerable:true, get: function() { return this._type; } +}); + +Object.defineProperty(Server.prototype, 'parserType', { + enumerable:true, get: function() { + return BSON.native ? "c++" : "js"; + } +}); + +Server.enableServerAccounting = function() { + serverAccounting = true; + servers = {}; +} + +Server.disableServerAccounting = function() { + serverAccounting = false; +} + +Server.servers = function() { + return servers; +} + +Object.defineProperty(Server.prototype, 'name', { + enumerable:true, + get: function() { return this.s.options.host + ":" + this.s.options.port; } +}); + +function configureWireProtocolHandler(self, ismaster) { + // 3.2 wire protocol handler + if(ismaster.maxWireVersion >= 4) { + return new ThreeTwoWireProtocolSupport(new TwoSixWireProtocolSupport()); + } + + // 2.6 wire protocol handler + if(ismaster.maxWireVersion >= 2) { + return new TwoSixWireProtocolSupport(); + } + + // 2.4 or earlier wire protocol handler + return new PreTwoSixWireProtocolSupport(); +} + +function disconnectHandler(self, type, ns, cmd, options, callback) { + // Topology is not connected, save the call in the provided store to be + // Executed at some point when the handler deems it's reconnected + if(!self.s.pool.isConnected() && self.s.disconnectHandler != null && !options.monitoring) { + self.s.disconnectHandler.add(type, ns, cmd, options, callback); + return true; + } + + // If we have no connection error + if(!self.s.pool.isConnected()) { + callback(MongoError.create(f("no connection available to server %s", self.name))); + return true; + } +} + +function monitoringProcess(self) { + return function() { + // Pool was destroyed do not continue process + if(self.s.pool.isDestroyed()) return; + // Emit monitoring Process event + self.emit('monitoring', self); + // Perform ismaster call + // Query options + var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; + // Create a query instance + var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true}, queryOptions); + // Get start time + var start = new Date().getTime(); + // Execute the ismaster query + self.s.pool.write(query, { + socketTimeout: self.s.options.connectionTimeout || 2000, + }, function(err, result) { + // Set initial lastIsMasterMS + self.lastIsMasterMS = new Date().getTime() - start; + if(self.s.pool.isDestroyed()) return; + // Update the ismaster view if we have a result + if(result) { + self.ismaster = result.result; + } + // Re-schedule the monitoring process + self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval); + }); + } +} + +var eventHandler = function(self, event) { + return function(err) { + // Log information of received information if in info mode + if(self.s.logger.isInfo()) { + var object = err instanceof MongoError ? JSON.stringify(err) : {} + self.s.logger.info(f('server %s fired event %s out with message %s' + , self.name, event, object)); + } + + // Handle connect event + if(event == 'connect') { + // Issue an ismaster command at connect + // Query options + var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; + // Create a query instance + var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true, client: self.clientInfo}, queryOptions); + // Get start time + var start = new Date().getTime(); + // Execute the ismaster query + self.s.pool.write(query, { + socketTimeout: self.s.options.connectionTimeout || 2000, + }, function(err, result) { + // Set initial lastIsMasterMS + self.lastIsMasterMS = new Date().getTime() - start; + if(err) { + self.destroy(); + if(self.listeners('error').length > 0) self.emit('error', err); + return; + } + + // Ensure no error emitted after initial connect when reconnecting + self.initalConnect = false; + // Save the ismaster + self.ismaster = result.result; + + // It's a proxy change the type so + // the wireprotocol will send $readPreference + if(self.ismaster.msg == 'isdbgrid') { + self._type = 'mongos'; + } + // Add the correct wire protocol handler + self.wireProtocolHandler = configureWireProtocolHandler(self, self.ismaster); + // Have we defined self monitoring + if(self.s.monitoring) { + self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval); + } + + // Emit server description changed if something listening + sdam.emitServerDescriptionChanged(self, { + address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : sdam.getTopologyType(self) + }); + + // Emit topology description changed if something listening + sdam.emitTopologyDescriptionChanged(self, { + topologyType: 'Single', servers: [{address: self.name, arbiters: [], hosts: [], passives: [], type: 'Standalone'}] + }); + + // Log the ismaster if available + if(self.s.logger.isInfo()) { + self.s.logger.info(f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster))); + } + + // Emit connect + self.emit('connect', self); + }); + } else if(event == 'error' || event == 'parseError' + || event == 'close' || event == 'timeout' || event == 'reconnect' + || event == 'attemptReconnect' || 'reconnectFailed') { + // Remove server instance from accounting + if(serverAccounting && ['close', 'timeout', 'error', 'parseError', 'reconnectFailed'].indexOf(event) != -1) { + // Emit toplogy opening event if not in topology + if(!self.s.inTopology) { + self.emit('topologyOpening', { topologyId: self.id }); + } + + delete servers[self.id]; + } + + // Reconnect failed return error + if(event == 'reconnectFailed') { + self.emit('reconnectFailed', err); + // Emit error if any listeners + if(self.listeners('error').length > 0) { + self.emit('error', err); + } + // Terminate + return; + } + + // On first connect fail + if(self.s.pool.state == 'disconnected' && self.initalConnect && ['close', 'timeout', 'error', 'parseError'].indexOf(event) != -1) { + self.initalConnect = false; + return self.emit('error', new MongoError(f('failed to connect to server [%s] on first connect', self.name))); + } + + // Reconnect event, emit the server + if(event == 'reconnect') { + return self.emit(event, self); + } + + // Emit the event + self.emit(event, err); + } + } +} + +/** + * Initiate server connect + * @method + * @param {array} [options.auth=null] Array of auth options to apply on connect + */ +Server.prototype.connect = function(options) { + var self = this; + options = options || {}; + + // Set the connections + if(serverAccounting) servers[this.id] = this; + + // Do not allow connect to be called on anything that's not disconnected + if(self.s.pool && !self.s.pool.isDisconnected() && !self.s.pool.isDestroyed()) { + throw MongoError.create(f('server instance in invalid state %s', self.s.state)); + } + + // Create a pool + self.s.pool = new Pool(assign(self.s.options, options, {bson: this.s.bson})); + + // Set up listeners + self.s.pool.on('close', eventHandler(self, 'close')); + self.s.pool.on('error', eventHandler(self, 'error')); + self.s.pool.on('timeout', eventHandler(self, 'timeout')); + self.s.pool.on('parseError', eventHandler(self, 'parseError')); + self.s.pool.on('connect', eventHandler(self, 'connect')); + self.s.pool.on('reconnect', eventHandler(self, 'reconnect')); + self.s.pool.on('reconnectFailed', eventHandler(self, 'reconnectFailed')); + + // Emit toplogy opening event if not in topology + if(!self.s.inTopology) { + this.emit('topologyOpening', { topologyId: self.id }); + } + + // Emit opening server event + self.emit('serverOpening', { + topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, + address: self.name + }); + + // Connect with optional auth settings + if(options.auth) { + self.s.pool.connect.apply(self.s.pool, options.auth); + } else { + self.s.pool.connect(); + } +} + +/** + * Get the server description + * @method + * @return {object} +*/ +Server.prototype.getDescription = function() { + var ismaster = this.ismaster || {}; + var description = { + type: sdam.getTopologyType(this), + address: this.name, + }; + + // Add fields if available + if(ismaster.hosts) description.hosts = ismaster.hosts; + if(ismaster.arbiters) description.arbiters = ismaster.arbiters; + if(ismaster.passives) description.passives = ismaster.passives; + if(ismaster.setName) description.setName = ismaster.setName; + return description; +} + +/** + * Returns the last known ismaster document for this server + * @method + * @return {object} + */ +Server.prototype.lastIsMaster = function() { + return this.ismaster; +} + +/** + * Unref all connections belong to this server + * @method + */ +Server.prototype.unref = function() { + this.s.pool.unref(); +} + +/** + * Figure out if the server is connected + * @method + * @return {boolean} + */ +Server.prototype.isConnected = function() { + if(!this.s.pool) return false; + return this.s.pool.isConnected(); +} + +/** + * Figure out if the server instance was destroyed by calling destroy + * @method + * @return {boolean} + */ +Server.prototype.isDestroyed = function() { + if(!this.s.pool) return false; + return this.s.pool.isDestroyed(); +} + +function basicWriteValidations(self) { + if(!self.s.pool) return MongoError.create('server instance is not connected'); + if(self.s.pool.isDestroyed()) return MongoError.create('server instance pool was destroyed'); +} + +function basicReadValidations(self, options) { + basicWriteValidations(self, options); + + if(options.readPreference && !(options.readPreference instanceof ReadPreference)) { + throw new Error("readPreference must be an instance of ReadPreference"); + } +} + +/** + * Execute a command + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {object} cmd The command hash + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document. + * @param {opResultCallback} callback A callback function + */ +Server.prototype.command = function(ns, cmd, options, callback) { + var self = this; + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + var result = basicReadValidations(self, options); + if(result) return callback(result); + + // Debug log + if(self.s.logger.isDebug()) self.s.logger.debug(f('executing command [%s] against %s', JSON.stringify({ + ns: ns, cmd: cmd, options: debugOptions(debugFields, options) + }), self.name)); + + // If we are not connected or have a disconnectHandler specified + if(disconnectHandler(self, 'command', ns, cmd, options, callback)) return; + + // Check if we have collation support + if(this.ismaster && this.ismaster.maxWireVersion < 5 && cmd.collation) { + return callback(new MongoError(f('server %s does not support collation', this.name))); + } + + // Query options + var queryOptions = { + numberToSkip: 0, + numberToReturn: -1, + checkKeys: typeof options.checkKeys == 'boolean' ? options.checkKeys: false, + serializeFunctions: typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false, + ignoreUndefined: typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false + }; + + // Create a query instance + var query = new Query(self.s.bson, ns, cmd, queryOptions); + // Set slave OK of the query + query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false; + + // Write options + var writeOptions = { + raw: typeof options.raw == 'boolean' ? options.raw : false, + promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true, + promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true, + promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false, + command: true, + monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : false, + fullResult: typeof options.fullResult == 'boolean' ? options.fullResult : false, + requestId: query.requestId, + socketTimeout: typeof options.socketTimeout == 'number' ? options.socketTimeout : null, + }; + + // Write the operation to the pool + self.s.pool.write(query, writeOptions, callback); +} + +/** + * Insert one or more documents + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of documents to insert + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Server.prototype.insert = function(ns, ops, options, callback) { + var self = this; + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + var result = basicWriteValidations(self, options); + if(result) return callback(result); + + // If we are not connected or have a disconnectHandler specified + if(disconnectHandler(self, 'insert', ns, ops, options, callback)) return; + + // Setup the docs as an array + ops = Array.isArray(ops) ? ops : [ops]; + + // Execute write + return self.wireProtocolHandler.insert(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback); +} + +/** + * Perform one or more update operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of updates + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Server.prototype.update = function(ns, ops, options, callback) { + var self = this; + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + var result = basicWriteValidations(self, options); + if(result) return callback(result); + + // If we are not connected or have a disconnectHandler specified + if(disconnectHandler(self, 'update', ns, ops, options, callback)) return; + + // Check if we have collation support + if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) { + return callback(new MongoError(f('server %s does not support collation', this.name))); + } + + // Setup the docs as an array + ops = Array.isArray(ops) ? ops : [ops]; + // Execute write + return self.wireProtocolHandler.update(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback); +} + +/** + * Perform one or more remove operations + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {array} ops An array of removes + * @param {boolean} [options.ordered=true] Execute in order or out of order + * @param {object} [options.writeConcern={}] Write concern for the operation + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Server.prototype.remove = function(ns, ops, options, callback) { + var self = this; + if(typeof options == 'function') callback = options, options = {}, options = options || {}; + var result = basicWriteValidations(self, options); + if(result) return callback(result); + + // If we are not connected or have a disconnectHandler specified + if(disconnectHandler(self, 'remove', ns, ops, options, callback)) return; + + // Check if we have collation support + if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) { + return callback(new MongoError(f('server %s does not support collation', this.name))); + } + + // Setup the docs as an array + ops = Array.isArray(ops) ? ops : [ops]; + // Execute write + return self.wireProtocolHandler.remove(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback); +} + +/** + * Get a new cursor + * @method + * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) + * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId + * @param {object} [options.batchSize=0] Batchsize for the operation + * @param {array} [options.documents=[]] Initial documents list for cursor + * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. + * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. + * @param {opResultCallback} callback A callback function + */ +Server.prototype.cursor = function(ns, cmd, cursorOptions) { + var s = this.s; + cursorOptions = cursorOptions || {}; + // Set up final cursor type + var FinalCursor = cursorOptions.cursorFactory || s.Cursor; + // Return the cursor + return new FinalCursor(s.bson, ns, cmd, cursorOptions, this, s.options); +} + +/** + * Logout from a database + * @method + * @param {string} db The db we are logging out from + * @param {authResultCallback} callback A callback function + */ +Server.prototype.logout = function(dbName, callback) { + this.s.pool.logout(dbName, callback); +} + +/** + * Authenticate using a specified mechanism + * @method + * @param {string} mechanism The Auth mechanism we are invoking + * @param {string} db The db we are invoking the mechanism against + * @param {...object} param Parameters for the specific mechanism + * @param {authResultCallback} callback A callback function + */ +Server.prototype.auth = function(mechanism, db) { + var self = this; + + // If we have the default mechanism we pick mechanism based on the wire + // protocol max version. If it's >= 3 then scram-sha1 otherwise mongodb-cr + if(mechanism == 'default' && self.ismaster && self.ismaster.maxWireVersion >= 3) { + mechanism = 'scram-sha-1'; + } else if(mechanism == 'default') { + mechanism = 'mongocr'; + } + + // Slice all the arguments off + var args = Array.prototype.slice.call(arguments, 0); + // Set the mechanism + args[0] = mechanism; + // Get the callback + var callback = args[args.length - 1]; + + // If we are not connected or have a disconnectHandler specified + if(disconnectHandler(self, 'auth', db, args, {}, callback)) { + return; + } + + // Do not authenticate if we are an arbiter + if(this.lastIsMaster() && this.lastIsMaster().arbiterOnly) { + return callback(null, true); + } + + // Apply the arguments to the pool + self.s.pool.auth.apply(self.s.pool, args); +} + +/** + * Compare two server instances + * @method + * @param {Server} server Server to compare equality against + * @return {boolean} + */ +Server.prototype.equals = function(server) { + if(typeof server == 'string') return this.name == server; + if(server.name) return this.name == server.name; + return false; +} + +/** + * All raw connections + * @method + * @return {Connection[]} + */ +Server.prototype.connections = function() { + return this.s.pool.allConnections(); +} + +/** + * Get server + * @method + * @return {Server} + */ +Server.prototype.getServer = function() { + return this; +} + +/** + * Get connection + * @method + * @return {Connection} + */ +Server.prototype.getConnection = function() { + return this.s.pool.get(); +} + +var listeners = ['close', 'error', 'timeout', 'parseError', 'connect']; + +/** + * Destroy the server connection + * @method + * @param {boolean} [options.emitClose=false] Emit close event on destroy + * @param {boolean} [options.emitDestroy=false] Emit destroy event on destroy + * @param {boolean} [options.force=false] Force destroy the pool + */ +Server.prototype.destroy = function(options) { + options = options || {}; + var self = this; + + // Set the connections + if(serverAccounting) delete servers[this.id]; + + // Destroy the monitoring process if any + if(this.monitoringProcessId) { + clearTimeout(this.monitoringProcessId); + } + + // Emit close event + if(options.emitClose) { + self.emit('close', self); + } + + // Emit destroy event + if(options.emitDestroy) { + self.emit('destroy', self); + } + + // Remove all listeners + listeners.forEach(function(event) { + self.s.pool.removeAllListeners(event); + }); + + // Emit opening server event + if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', { + topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, address: self.name + }); + + // Emit toplogy opening event if not in topology + if(self.listeners('topologyClosed').length > 0 && !self.s.inTopology) { + self.emit('topologyClosed', { topologyId: self.id }); + } + + if(self.s.logger.isDebug()) { + self.s.logger.debug(f('destroy called on server %s', self.name)); + } + + // Destroy the pool + this.s.pool.destroy(options.force); +} + +/** + * A server connect event, used to verify that the connection is up and running + * + * @event Server#connect + * @type {Server} + */ + +/** + * A server reconnect event, used to verify that the server topology has reconnected + * + * @event Server#reconnect + * @type {Server} + */ + +/** + * A server opening SDAM monitoring event + * + * @event Server#serverOpening + * @type {object} + */ + +/** + * A server closed SDAM monitoring event + * + * @event Server#serverClosed + * @type {object} + */ + +/** + * A server description SDAM change monitoring event + * + * @event Server#serverDescriptionChanged + * @type {object} + */ + +/** + * A topology open SDAM event + * + * @event Server#topologyOpening + * @type {object} + */ + +/** + * A topology closed SDAM event + * + * @event Server#topologyClosed + * @type {object} + */ + +/** + * A topology structure SDAM change event + * + * @event Server#topologyDescriptionChanged + * @type {object} + */ + +/** + * Server reconnect failed + * + * @event Server#reconnectFailed + * @type {Error} + */ + +/** + * Server connection pool closed + * + * @event Server#close + * @type {object} + */ + +/** + * Server connection pool caused an error + * + * @event Server#error + * @type {Error} + */ + +/** + * Server destroyed was called + * + * @event Server#destroy + * @type {Server} + */ + +module.exports = Server; diff --git a/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/shared.js b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/shared.js new file mode 100644 index 0000000..8cc019c --- /dev/null +++ b/common/src/main/webapp/usageguide/appserver/node_modules/mongodb/node_modules/mongodb-core/lib/topologies/shared.js @@ -0,0 +1,225 @@ +"use strict" + +var os = require('os'), + f = require('util').format; + +/** + * Emit event if it exists + * @method + */ +function emitSDAMEvent(self, event, description) { + if(self.listeners(event).length > 0) { + self.emit(event, description); + } +} + +// Get package.json variable +var driverVersion = require(__dirname + '/../../package.json').version; +var nodejsversion = f('Node.js %s, %s', process.version, os.endianness()); +var type = os.type(); +var name = process.platform; +var architecture = process.arch; +var release = os.release(); + +function createClientInfo(options) { + // Build default client information + var clientInfo = options.clientInfo ? clone(options.clientInfo) : { + driver: { + name: "nodejs-core", + version: driverVersion + }, + os: { + type: type, + name: name, + architecture: architecture, + version: release + } + } + + // Is platform specified + if(clientInfo.platform && clientInfo.platform.indexOf('mongodb-core') == -1) { + clientInfo.platform = f('%s, mongodb-core: %s', clientInfo.platform, driverVersion); + } else if(!clientInfo.platform){ + clientInfo.platform = nodejsversion; + } + + // Do we have an application specific string + if(options.appname) { + // Cut at 128 bytes + var buffer = new Buffer(options.appname); + // Return the truncated appname + var appname = buffer.length > 128 ? buffer.slice(0, 128).toString('utf8') : options.appname; + // Add to the clientInfo + clientInfo.application = { name: appname }; + } + + return clientInfo; +} + +function clone(object) { + return JSON.parse(JSON.stringify(object)); +} + +var getPreviousDescription = function(self) { + if(!self.s.serverDescription) { + self.s.serverDescription = { + address: self.name, + arbiters: [], hosts: [], passives: [], type: 'Unknown' + } + } + + return self.s.serverDescription; +} + +var emitServerDescriptionChanged = function(self, description) { + if(self.listeners('serverDescriptionChanged').length > 0) { + // Emit the server description changed events + self.emit('serverDescriptionChanged', { + topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, address: self.name, + previousDescription: getPreviousDescription(self), + newDescription: description + }); + + self.s.serverDescription = description; + } +} + +var getPreviousTopologyDescription = function(self) { + if(!self.s.topologyDescription) { + self.s.topologyDescription = { + topologyType: 'Unknown', + servers: [{ + address: self.name, arbiters: [], hosts: [], passives: [], type: 'Unknown' + }] + } + } + + return self.s.topologyDescription; +} + +var emitTopologyDescriptionChanged = function(self, description) { + if(self.listeners('topologyDescriptionChanged').length > 0) { + // Emit the server description changed events + self.emit('topologyDescriptionChanged', { + topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, address: self.name, + previousDescription: getPreviousTopologyDescription(self), + newDescription: description + }); + + self.s.serverDescription = description; + } +} + +var changedIsMaster = function(self, currentIsmaster, ismaster) { + var currentType = getTopologyType(self, currentIsmaster); + var newType = getTopologyType(self, ismaster); + if(newType != currentType) return true; + return false; +} + +var getTopologyType = function(self, ismaster) { + if(!ismaster) { + ismaster = self.ismaster; + } + + if(!ismaster) return 'Unknown'; + if(ismaster.ismaster && !ismaster.hosts) return 'Standalone'; + if(ismaster.ismaster && ismaster.msg == 'isdbgrid') return 'Mongos'; + if(ismaster.ismaster) return 'RSPrimary'; + if(ismaster.secondary) return 'RSSecondary'; + if(ismaster.arbiterOnly) return 'RSArbiter'; + return 'Unknown'; +} + +var inquireServerState = function(self) { + return function(callback) { + if(self.s.state == 'destroyed') return; + // Record response time + var start = new Date().getTime(); + + // emitSDAMEvent + emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: self.name }); + + // Attempt to execute ismaster command + self.command('admin.$cmd', { ismaster:true }, { monitoring:true }, function(err, r) { + if(!err) { + // Legacy event sender + self.emit('ismaster', r, self); + + // Calculate latencyMS + var latencyMS = new Date().getTime() - start; + + // Server heart beat event + emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: self.name }); + + // Did the server change + if(changedIsMaster(self, self.s.ismaster, r.result)) { + // Emit server description changed if something listening + emitServerDescriptionChanged(self, { + address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : getTopologyType(self) + }); + } + + // Updat ismaster view + self.s.ismaster = r.result; + + // Set server response time + self.s.isMasterLatencyMS = latencyMS; + } else { + emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: self.name }); + } + + // Peforming an ismaster monitoring callback operation + if(typeof callback == 'function') { + return callback(err, r); + } + + // Perform another sweep + self.s.inquireServerStateTimeout = setTimeout(inquireServerState(self), self.s.haInterval); + }); + }; +} + +// Object.assign method or polyfille +var assign = Object.assign ? Object.assign : function assign(target) { + if (target === undefined || target === null) { + throw new TypeError('Cannot convert first argument to object'); + } + + var to = Object(target); + for (var i = 1; i < arguments.length; i++) { + var nextSource = arguments[i]; + if (nextSource === undefined || nextSource === null) { + continue; + } + + var keysArray = Object.keys(Object(nextSource)); + for (var nextIndex = 0, len = keysArray.length; nextIndex < len; nextIndex++) { + var nextKey = keysArray[nextIndex]; + var desc = Object.getOwnPropertyDescriptor(nextSource, nextKey); + if (desc !== undefined && desc.enumerable) { + to[nextKey] = nextSource[nextKey]; + } + } + } + return to; +} + +// +// Clone the options +var cloneOptions = function(options) { + var opts = {}; + for(var name in options) { + opts[name] = options[name]; + } + return opts; +} + +module.exports.inquireServerState = inquireServerState +module.exports.getTopologyType = getTopologyType; +module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged; +module.exports.emitTopologyDescriptionChanged = emitTopologyDescriptionChanged; +module.exports.cloneOptions = cloneOptions; +module.exports.assign = assign; +module.exports.createClientInfo = createClientInfo; +module.exports.clone = clone; |