var EventEmitter = require('events').EventEmitter, inherits = require('util').inherits; // Get prototypes var AggregationCursor = require('./aggregation_cursor'), CommandCursor = require('./command_cursor'), OrderedBulkOperation = require('./bulk/ordered').OrderedBulkOperation, UnorderedBulkOperation = require('./bulk/unordered').UnorderedBulkOperation, GridStore = require('./gridfs/grid_store'), Cursor = require('./cursor'), Collection = require('./collection'), Db = require('./db'); var basicOperationIdGenerator = { operationId: 1, next: function() { return this.operationId++; } } var basicTimestampGenerator = { current: function() { return new Date().getTime(); }, duration: function(start, end) { return end - start; } } var senstiveCommands = ['authenticate', 'saslStart', 'saslContinue', 'getnonce', 'createUser', 'updateUser', 'copydbgetnonce', 'copydbsaslstart', 'copydb']; var Instrumentation = function(core, options, callback) { options = options || {}; // Optional id generators var operationIdGenerator = options.operationIdGenerator || basicOperationIdGenerator; // Optional timestamp generator var timestampGenerator = options.timestampGenerator || basicTimestampGenerator; // Extend with event emitter functionality EventEmitter.call(this); // Contains all the instrumentation overloads this.overloads = []; // --------------------------------------------------------- // // Instrument prototype // // --------------------------------------------------------- var instrumentPrototype = function(callback) { var instrumentations = [] // Classes to support var classes = [GridStore, OrderedBulkOperation, UnorderedBulkOperation, CommandCursor, AggregationCursor, Cursor, Collection, Db]; // Add instrumentations to the available list for(var i = 0; i < classes.length; i++) { if(classes[i].define) { instrumentations.push(classes[i].define.generate()); } } // Return the list of instrumentation points callback(null, instrumentations); } // Did the user want to instrument the prototype if(typeof callback == 'function') { instrumentPrototype(callback); } // --------------------------------------------------------- // // Server // // --------------------------------------------------------- // Reference var self = this; // Names of methods we need to wrap var methods = ['command', 'insert', 'update', 'remove']; // Prototype var proto = core.Server.prototype; // Core server method we are going to wrap methods.forEach(function(x) { var func = proto[x]; // Add to overloaded methods self.overloads.push({proto: proto, name:x, func:func}); // The actual prototype proto[x] = function() { var requestId = core.Query.nextRequestId(); // Get the aruments var args = Array.prototype.slice.call(arguments, 0); var ns = args[0]; var commandObj = args[1]; var options = args[2] || {}; var keys = Object.keys(commandObj); var commandName = keys[0]; var db = ns.split('.')[0]; // Get the collection var col = ns.split('.'); col.shift(); col = col.join('.'); // Do we have a legacy insert/update/remove command if(x == 'insert') { //} && !this.lastIsMaster().maxWireVersion) { commandName = 'insert'; // Re-write the command commandObj = { insert: col, documents: commandObj } if(options.writeConcern && Object.keys(options.writeConcern).length > 0) { commandObj.writeConcern = options.writeConcern; } commandObj.ordered = options.ordered != undefined ? options.ordered : true; } else if(x == 'update') { // && !this.lastIsMaster().maxWireVersion) { commandName = 'update'; // Re-write the command commandObj = { update: col, updates: commandObj } if(options.writeConcern && Object.keys(options.writeConcern).length > 0) { commandObj.writeConcern = options.writeConcern; } commandObj.ordered = options.ordered != undefined ? options.ordered : true; } else if(x == 'remove') { //&& !this.lastIsMaster().maxWireVersion) { commandName = 'delete'; // Re-write the command commandObj = { delete: col, deletes: commandObj } if(options.writeConcern && Object.keys(options.writeConcern).length > 0) { commandObj.writeConcern = options.writeConcern; } commandObj.ordered = options.ordered != undefined ? options.ordered : true; } // Get the callback var callback = args.pop(); // Set current callback operation id from the current context or create // a new one var ourOpId = callback.operationId || operationIdGenerator.next(); // Get a connection reference for this server instance var connection = this.s.pool.get() // Emit the start event for the command var command = { // Returns the command. command: commandObj, // Returns the database name. databaseName: db, // Returns the command name. commandName: commandName, // Returns the driver generated request id. requestId: requestId, // Returns the driver generated operation id. // This is used to link events together such as bulk write operations. OPTIONAL. operationId: ourOpId, // Returns the connection id for the command. For languages that do not have this, // this MUST return the driver equivalent which MUST include the server address and port. // The name of this field is flexible to match the object that is returned from the driver. connectionId: connection }; // Filter out any sensitive commands if(senstiveCommands.indexOf(commandName.toLowerCase())) { command.commandObj = {}; command.commandObj[commandName] = true; } // Emit the started event self.emit('started', command) // Start time var startTime = timestampGenerator.current(); // Push our handler callback args.push(function(err, r) { var endTime = timestampGenerator.current(); var command = { duration: timestampGenerator.duration(startTime, endTime), commandName: commandName, requestId: requestId, operationId: ourOpId, connectionId: connection }; // If we have an error if(err || (r && r.result && r.result.ok == 0)) { command.failure = err || r.result.writeErrors || r.result; // Filter out any sensitive commands if(senstiveCommands.indexOf(commandName.toLowerCase())) { command.failure = {}; } self.emit('failed', command); } else if(commandObj && commandObj.writeConcern && commandObj.writeConcern.w == 0) { // If we have write concern 0 command.reply = {ok:1}; self.emit('succeeded', command); } else { command.reply = r && r.result ? r.result : r; // Filter out any sensitive commands if(senstiveCommands.indexOf(commandName.toLowerCase()) != -1) { command.reply = {}; } self.emit('succeeded', command); } // Return to caller callback(err, r); }); // Apply the call func.apply(this, args); } }); // --------------------------------------------------------- // // Bulk Operations // // --------------------------------------------------------- // Inject ourselves into the Bulk methods methods = ['execute']; var prototypes = [ require('./bulk/ordered').Bulk.prototype, require('./bulk/unordered').Bulk.prototype ] prototypes.forEach(function(proto) { // Core server method we are going to wrap methods.forEach(function(x) { var func = proto[x]; // Add to overloaded methods self.overloads.push({proto: proto, name:x, func:func}); // The actual prototype proto[x] = function() { // Get the aruments var args = Array.prototype.slice.call(arguments, 0); // Set an operation Id on the bulk object this.operationId = operationIdGenerator.next(); // Get the callback var callback = args.pop(); // If we have a callback use this if(typeof callback == 'function') { args.push(function(err, r) { // Return to caller callback(err, r); }); // Apply the call func.apply(this, args); } else { return func.apply(this, args); } } }); }); // --------------------------------------------------------- // // Cursor // // --------------------------------------------------------- // Inject ourselves into the Cursor methods methods = ['_find', '_getmore', '_killcursor']; prototypes = [ require('./cursor').prototype, require('./command_cursor').prototype, require('./aggregation_cursor').prototype ] // Command name translation var commandTranslation = { '_find': 'find', '_getmore': 'getMore', '_killcursor': 'killCursors', '_explain': 'explain' } prototypes.forEach(function(proto) { // Core server method we are going to wrap methods.forEach(function(x) { var func = proto[x]; // Add to overloaded methods self.overloads.push({proto: proto, name:x, func:func}); // The actual prototype proto[x] = function() { var cursor = this; var requestId = core.Query.nextRequestId(); var ourOpId = operationIdGenerator.next(); var parts = this.ns.split('.'); var db = parts[0]; // Get the collection parts.shift(); var collection = parts.join('.'); // Set the command var command = this.query; var cmd = this.s.cmd; // If we have a find method, set the operationId on the cursor if(x == '_find') { cursor.operationId = ourOpId; } // Do we have a find command rewrite it if(x == '_getmore') { command = { getMore: this.cursorState.cursorId, collection: collection, batchSize: cmd.batchSize } if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS; } else if(x == '_killcursor') { command = { killCursors: collection, cursors: [this.cursorState.cursorId] } } else if(cmd.find) { command = { find: collection, filter: cmd.query } if(cmd.sort) command.sort = cmd.sort; if(cmd.fields) command.projection = cmd.fields; if(cmd.limit && cmd.limit < 0) { command.limit = Math.abs(cmd.limit); command.singleBatch = true; } else if(cmd.limit) { command.limit = Math.abs(cmd.limit); } // Options if(cmd.skip) command.skip = cmd.skip; if(cmd.hint) command.hint = cmd.hint; if(cmd.batchSize) command.batchSize = cmd.batchSize; if(typeof cmd.returnKey == 'boolean') command.returnKey = cmd.returnKey; if(cmd.comment) command.comment = cmd.comment; if(cmd.min) command.min = cmd.min; if(cmd.max) command.max = cmd.max; if(cmd.maxScan) command.maxScan = cmd.maxScan; if(cmd.maxTimeMS) command.maxTimeMS = cmd.maxTimeMS; // Flags if(typeof cmd.awaitData == 'boolean') command.awaitData = cmd.awaitData; if(typeof cmd.snapshot == 'boolean') command.snapshot = cmd.snapshot; if(typeof cmd.tailable == 'boolean') command.tailable = cmd.tailable; if(typeof cmd.oplogReplay == 'boolean') command.oplogReplay = cmd.oplogReplay; if(typeof cmd.noCursorTimeout == 'boolean') command.noCursorTimeout = cmd.noCursorTimeout; if(typeof cmd.partial == 'boolean') command.partial = cmd.partial; if(typeof cmd.showDiskLoc == 'boolean') command.showRecordId = cmd.showDiskLoc; // Read Concern if(cmd.readConcern) command.readConcern = cmd.readConcern; // Override method if(cmd.explain) command.explain = cmd.explain; if(cmd.exhaust) command.exhaust = cmd.exhaust; // If we have a explain flag if(cmd.explain) { // Create fake explain command command = { explain: command, verbosity: 'allPlansExecution' } // Set readConcern on the command if available if(cmd.readConcern) command.readConcern = cmd.readConcern // Set up the _explain name for the command x = '_explain'; } } else { command = cmd; } // Set up the connection var connectionId = null; // Set local connection if(this.connection) connectionId = this.connection; if(!connectionId && this.server && this.server.getConnection) connectionId = this.server.getConnection(); // Get the command Name var commandName = x == '_find' ? Object.keys(command)[0] : commandTranslation[x]; // Emit the start event for the command command = { // Returns the command. command: command, // Returns the database name. databaseName: db, // Returns the command name. commandName: commandName, // Returns the driver generated request id. requestId: requestId, // Returns the driver generated operation id. // This is used to link events together such as bulk write operations. OPTIONAL. operationId: this.operationId, // Returns the connection id for the command. For languages that do not have this, // this MUST return the driver equivalent which MUST include the server address and port. // The name of this field is flexible to match the object that is returned from the driver. connectionId: connectionId }; // Get the aruments var args = Array.prototype.slice.call(arguments, 0); // Get the callback var callback = args.pop(); // We do not have a callback but a Promise if(typeof callback == 'function' || command.commandName == 'killCursors') { var startTime = timestampGenerator.current(); // Emit the started event self.emit('started', command) // Emit succeeded event with killcursor if we have a legacy protocol if(command.commandName == 'killCursors' && this.server.lastIsMaster() && this.server.lastIsMaster().maxWireVersion < 4) { // Emit the succeeded command command = { duration: timestampGenerator.duration(startTime, timestampGenerator.current()), commandName: commandName, requestId: requestId, operationId: cursor.operationId, connectionId: cursor.server.getConnection(), reply: [{ok:1}] }; // Apply callback to the list of args args.push(callback); // Apply the call func.apply(this, args); // Emit the command return self.emit('succeeded', command) } // Add our callback handler args.push(function(err, r) { if(err) { // Command var command = { duration: timestampGenerator.duration(startTime, timestampGenerator.current()), commandName: commandName, requestId: requestId, operationId: ourOpId, connectionId: cursor.server.getConnection(), failure: err }; // Emit the command self.emit('failed', command) } else { // Do we have a getMore if(commandName.toLowerCase() == 'getmore' && r == null) { r = { cursor: { id: cursor.cursorState.cursorId, ns: cursor.ns, nextBatch: cursor.cursorState.documents }, ok:1 } } else if(commandName.toLowerCase() == 'find' && r == null) { r = { cursor: { id: cursor.cursorState.cursorId, ns: cursor.ns, firstBatch: cursor.cursorState.documents }, ok:1 } } else if(commandName.toLowerCase() == 'killcursors' && r == null) { r = { cursorsUnknown:[cursor.cursorState.lastCursorId], ok:1 } } // cursor id is zero, we can issue success command command = { duration: timestampGenerator.duration(startTime, timestampGenerator.current()), commandName: commandName, requestId: requestId, operationId: cursor.operationId, connectionId: cursor.server.getConnection(), reply: r && r.result ? r.result : r }; // Emit the command self.emit('succeeded', command) } // Return if(!callback) return; // Return to caller callback(err, r); }); // Apply the call func.apply(this, args); } else { // Assume promise, push back the missing value args.push(callback); // Get the promise var promise = func.apply(this, args); // Return a new promise return new cursor.s.promiseLibrary(function(resolve, reject) { var startTime = timestampGenerator.current(); // Emit the started event self.emit('started', command) // Execute the function promise.then(function() { // cursor id is zero, we can issue success command var command = { duration: timestampGenerator.duration(startTime, timestampGenerator.current()), commandName: commandName, requestId: requestId, operationId: cursor.operationId, connectionId: cursor.server.getConnection(), reply: cursor.cursorState.documents }; // Emit the command self.emit('succeeded', command) }).catch(function(err) { // Command var command = { duration: timestampGenerator.duration(startTime, timestampGenerator.current()), commandName: commandName, requestId: requestId, operationId: ourOpId, connectionId: cursor.server.getConnection(), failure: err }; // Emit the command self.emit('failed', command) // reject the promise reject(err); }); }); } } }); }); } inherits(Instrumentation, EventEmitter); Instrumentation.prototype.uninstrument = function() { for(var i = 0; i < this.overloads.length; i++) { var obj = this.overloads[i]; obj.proto[obj.name] = obj.func; } // Remove all listeners this.removeAllListeners('started'); this.removeAllListeners('succeeded'); this.removeAllListeners('failed'); } module.exports = Instrumentation;