From 93e29ad7d5c9ba017252cb7b25aca94657023dd7 Mon Sep 17 00:00:00 2001 From: nancylizi
+ * var bayeuxUrl2 = ...; + * + * // Dojo style + * var cometd2 = new dojox.Cometd('another_optional_name'); + * + * // jQuery style + * var cometd2 = new $.Cometd('another_optional_name'); + * + * cometd2.init({url: bayeuxUrl2}); + *+ * @param name the optional name of this cometd object + */ +// IMPLEMENTATION NOTES: +// Be very careful in not changing the function order and pass this file every time through JSLint (http://jslint.com) +// The only implied globals must be "dojo", "org" and "window", and check that there are no "unused" warnings +// Failing to pass JSLint may result in shrinkers/minifiers to create an unusable file. +org.cometd.Cometd = function(name) +{ + var _cometd = this; + var _name = name || 'default'; + var _crossDomain = false; + var _transports = new org.cometd.TransportRegistry(); + var _transport; + var _status = 'disconnected'; + var _messageId = 0; + var _clientId = null; + var _batch = 0; + var _messageQueue = []; + var _internalBatch = false; + var _listeners = {}; + var _backoff = 0; + var _scheduledSend = null; + var _extensions = []; + var _advice = {}; + var _handshakeProps; + var _publishCallbacks = {}; + var _reestablish = false; + var _connected = false; + var _config = { + connectTimeout: 0, + maxConnections: 2, + backoffIncrement: 1000, + maxBackoff: 60000, + logLevel: 'info', + reverseIncomingExtensions: true, + maxNetworkDelay: 10000, + requestHeaders: {}, + appendMessageTypeToURL: true, + autoBatch: false, + advice: { + timeout: 60000, + interval: 0, + reconnect: 'retry' + } + }; + + /** + * Mixes in the given objects into the target object by copying the properties. + * @param deep if the copy must be deep + * @param target the target object + * @param objects the objects whose properties are copied into the target + */ + this._mixin = function(deep, target, objects) + { + var result = target || {}; + + // Skip first 2 parameters (deep and target), and loop over the others + for (var i = 2; i < arguments.length; ++i) + { + var object = arguments[i]; + + if (object === undefined || object === null) + { + continue; + } + + for (var propName in object) + { + var prop = object[propName]; + var targ = result[propName]; + + // Avoid infinite loops + if (prop === target) + { + continue; + } + // Do not mixin undefined values + if (prop === undefined) + { + continue; + } + + if (deep && typeof prop === 'object' && prop !== null) + { + if (prop instanceof Array) + { + result[propName] = this._mixin(deep, targ instanceof Array ? targ : [], prop); + } + else + { + var source = typeof targ === 'object' && !(targ instanceof Array) ? targ : {}; + result[propName] = this._mixin(deep, source, prop); + } + } + else + { + result[propName] = prop; + } + } + } + + return result; + }; + + function _isString(value) + { + return org.cometd.Utils.isString(value); + } + + function _isFunction(value) + { + if (value === undefined || value === null) + { + return false; + } + return typeof value === 'function'; + } + + function _log(level, args) + { + if (window.console) + { + var logger = window.console[level]; + if (_isFunction(logger)) + { + logger.apply(window.console, args); + } + } + } + + this._warn = function() + { + _log('warn', arguments); + }; + + this._info = function() + { + if (_config.logLevel !== 'warn') + { + _log('info', arguments); + } + }; + + this._debug = function() + { + if (_config.logLevel === 'debug') + { + _log('debug', arguments); + } + }; + + /** + * Returns whether the given hostAndPort is cross domain. + * The default implementation checks against window.location.host + * but this function can be overridden to make it work in non-browser + * environments. + * + * @param hostAndPort the host and port in format host:port + * @return whether the given hostAndPort is cross domain + */ + this._isCrossDomain = function(hostAndPort) + { + return hostAndPort && hostAndPort !== window.location.host; + }; + + function _configure(configuration) + { + _cometd._debug('Configuring cometd object with', configuration); + // Support old style param, where only the Bayeux server URL was passed + if (_isString(configuration)) + { + configuration = { url: configuration }; + } + if (!configuration) + { + configuration = {}; + } + + _config = _cometd._mixin(false, _config, configuration); + + if (!_config.url) + { + throw 'Missing required configuration parameter \'url\' specifying the Bayeux server URL'; + } + + // Check if we're cross domain + // [1] = protocol://, [2] = host:port, [3] = host, [4] = IPv6_host, [5] = IPv4_host, [6] = :port, [7] = port, [8] = uri, [9] = rest + var urlParts = /(^https?:\/\/)?(((\[[^\]]+\])|([^:\/\?#]+))(:(\d+))?)?([^\?#]*)(.*)?/.exec(_config.url); + var hostAndPort = urlParts[2]; + var uri = urlParts[8]; + var afterURI = urlParts[9]; + _crossDomain = _cometd._isCrossDomain(hostAndPort); + + // Check if appending extra path is supported + if (_config.appendMessageTypeToURL) + { + if (afterURI !== undefined && afterURI.length > 0) + { + _cometd._info('Appending message type to URI ' + uri + afterURI + ' is not supported, disabling \'appendMessageTypeToURL\' configuration'); + _config.appendMessageTypeToURL = false; + } + else + { + var uriSegments = uri.split('/'); + var lastSegmentIndex = uriSegments.length - 1; + if (uri.match(/\/$/)) + { + lastSegmentIndex -= 1; + } + if (uriSegments[lastSegmentIndex].indexOf('.') >= 0) + { + // Very likely the CometD servlet's URL pattern is mapped to an extension, such as *.cometd + // It will be difficult to add the extra path in this case + _cometd._info('Appending message type to URI ' + uri + ' is not supported, disabling \'appendMessageTypeToURL\' configuration'); + _config.appendMessageTypeToURL = false; + } + } + } + } + + function _clearSubscriptions() + { + for (var channel in _listeners) + { + var subscriptions = _listeners[channel]; + for (var i = 0; i < subscriptions.length; ++i) + { + var subscription = subscriptions[i]; + if (subscription && !subscription.listener) + { + delete subscriptions[i]; + _cometd._debug('Removed subscription', subscription, 'for channel', channel); + } + } + } + } + + function _setStatus(newStatus) + { + if (_status !== newStatus) + { + _cometd._debug('Status', _status, '->', newStatus); + _status = newStatus; + } + } + + function _isDisconnected() + { + return _status === 'disconnecting' || _status === 'disconnected'; + } + + function _nextMessageId() + { + return ++_messageId; + } + + function _applyExtension(scope, callback, name, message, outgoing) + { + try + { + return callback.call(scope, message); + } + catch (x) + { + _cometd._debug('Exception during execution of extension', name, x); + var exceptionCallback = _cometd.onExtensionException; + if (_isFunction(exceptionCallback)) + { + _cometd._debug('Invoking extension exception callback', name, x); + try + { + exceptionCallback.call(_cometd, x, name, outgoing, message); + } + catch(xx) + { + _cometd._info('Exception during execution of exception callback in extension', name, xx); + } + } + return message; + } + } + + function _applyIncomingExtensions(message) + { + for (var i = 0; i < _extensions.length; ++i) + { + if (message === undefined || message === null) + { + break; + } + + var index = _config.reverseIncomingExtensions ? _extensions.length - 1 - i : i; + var extension = _extensions[index]; + var callback = extension.extension.incoming; + if (_isFunction(callback)) + { + var result = _applyExtension(extension.extension, callback, extension.name, message, false); + message = result === undefined ? message : result; + } + } + return message; + } + + function _applyOutgoingExtensions(message) + { + for (var i = 0; i < _extensions.length; ++i) + { + if (message === undefined || message === null) + { + break; + } + + var extension = _extensions[i]; + var callback = extension.extension.outgoing; + if (_isFunction(callback)) + { + var result = _applyExtension(extension.extension, callback, extension.name, message, true); + message = result === undefined ? message : result; + } + } + return message; + } + + function _notify(channel, message) + { + var subscriptions = _listeners[channel]; + if (subscriptions && subscriptions.length > 0) + { + for (var i = 0; i < subscriptions.length; ++i) + { + var subscription = subscriptions[i]; + // Subscriptions may come and go, so the array may have 'holes' + if (subscription) + { + try + { + subscription.callback.call(subscription.scope, message); + } + catch (x) + { + _cometd._debug('Exception during notification', subscription, message, x); + var listenerCallback = _cometd.onListenerException; + if (_isFunction(listenerCallback)) + { + _cometd._debug('Invoking listener exception callback', subscription, x); + try + { + listenerCallback.call(_cometd, x, subscription.handle, subscription.listener, message); + } + catch (xx) + { + _cometd._info('Exception during execution of listener callback', subscription, xx); + } + } + } + } + } + } + } + + function _notifyListeners(channel, message) + { + // Notify direct listeners + _notify(channel, message); + + // Notify the globbing listeners + var channelParts = channel.split('/'); + var last = channelParts.length - 1; + for (var i = last; i > 0; --i) + { + var channelPart = channelParts.slice(0, i).join('/') + '/*'; + // We don't want to notify /foo/* if the channel is /foo/bar/baz, + // so we stop at the first non recursive globbing + if (i === last) + { + _notify(channelPart, message); + } + // Add the recursive globber and notify + channelPart += '*'; + _notify(channelPart, message); + } + } + + function _cancelDelayedSend() + { + if (_scheduledSend !== null) + { + org.cometd.Utils.clearTimeout(_scheduledSend); + } + _scheduledSend = null; + } + + function _delayedSend(operation) + { + _cancelDelayedSend(); + var delay = _advice.interval + _backoff; + _cometd._debug('Function scheduled in', delay, 'ms, interval =', _advice.interval, 'backoff =', _backoff, operation); + _scheduledSend = org.cometd.Utils.setTimeout(_cometd, operation, delay); + } + + // Needed to break cyclic dependencies between function definitions + var _handleMessages; + var _handleFailure; + + /** + * Delivers the messages to the CometD server + * @param messages the array of messages to send + * @param longpoll true if this send is a long poll + */ + function _send(sync, messages, longpoll, extraPath) + { + // We must be sure that the messages have a clientId. + // This is not guaranteed since the handshake may take time to return + // (and hence the clientId is not known yet) and the application + // may create other messages. + for (var i = 0; i < messages.length; ++i) + { + var message = messages[i]; + message.id = '' + _nextMessageId(); + + if (_clientId) + { + message.clientId = _clientId; + } + + var callback = undefined; + if (_isFunction(message._callback)) + { + callback = message._callback; + // Remove the publish callback before calling the extensions + delete message._callback; + } + + message = _applyOutgoingExtensions(message); + if (message !== undefined && message !== null) + { + messages[i] = message; + if (callback) + _publishCallbacks[message.id] = callback; + } + else + { + messages.splice(i--, 1); + } + } + + if (messages.length === 0) + { + return; + } + + var url = _config.url; + if (_config.appendMessageTypeToURL) + { + // If url does not end with '/', then append it + if (!url.match(/\/$/)) + { + url = url + '/'; + } + if (extraPath) + { + url = url + extraPath; + } + } + + var envelope = { + url: url, + sync: sync, + messages: messages, + onSuccess: function(rcvdMessages) + { + try + { + _handleMessages.call(_cometd, rcvdMessages); + } + catch (x) + { + _cometd._debug('Exception during handling of messages', x); + } + }, + onFailure: function(conduit, messages, reason, exception) + { + try + { + _handleFailure.call(_cometd, conduit, messages, reason, exception); + } + catch (x) + { + _cometd._debug('Exception during handling of failure', x); + } + } + }; + _cometd._debug('Send', envelope); + _transport.send(envelope, longpoll); + } + + function _queueSend(message) + { + if (_batch > 0 || _internalBatch === true) + { + _messageQueue.push(message); + } + else + { + _send(false, [message], false); + } + } + + /** + * Sends a complete bayeux message. + * This method is exposed as a public so that extensions may use it + * to send bayeux message directly, for example in case of re-sending + * messages that have already been sent but that for some reason must + * be resent. + */ + this.send = _queueSend; + + function _resetBackoff() + { + _backoff = 0; + } + + function _increaseBackoff() + { + if (_backoff < _config.maxBackoff) + { + _backoff += _config.backoffIncrement; + } + } + + /** + * Starts a the batch of messages to be sent in a single request. + * @see #_endBatch(sendMessages) + */ + function _startBatch() + { + ++_batch; + } + + function _flushBatch() + { + var messages = _messageQueue; + _messageQueue = []; + if (messages.length > 0) + { + _send(false, messages, false); + } + } + + /** + * Ends the batch of messages to be sent in a single request, + * optionally sending messages present in the message queue depending + * on the given argument. + * @see #_startBatch() + */ + function _endBatch() + { + --_batch; + if (_batch < 0) + { + throw 'Calls to startBatch() and endBatch() are not paired'; + } + + if (_batch === 0 && !_isDisconnected() && !_internalBatch) + { + _flushBatch(); + } + } + + /** + * Sends the connect message + */ + function _connect() + { + if (!_isDisconnected()) + { + var message = { + channel: '/meta/connect', + connectionType: _transport.getType() + }; + + // In case of reload or temporary loss of connection + // we want the next successful connect to return immediately + // instead of being held by the server, so that connect listeners + // can be notified that the connection has been re-established + if (!_connected) + { + message.advice = { timeout: 0 }; + } + + _setStatus('connecting'); + _cometd._debug('Connect sent', message); + _send(false, [message], true, 'connect'); + _setStatus('connected'); + } + } + + function _delayedConnect() + { + _setStatus('connecting'); + _delayedSend(function() + { + _connect(); + }); + } + + function _updateAdvice(newAdvice) + { + if (newAdvice) + { + _advice = _cometd._mixin(false, {}, _config.advice, newAdvice); + _cometd._debug('New advice', _advice); + } + } + + function _disconnect(abort) + { + _cancelDelayedSend(); + if (abort) + { + _transport.abort(); + } + _clientId = null; + _setStatus('disconnected'); + _batch = 0; + _resetBackoff(); + + // Fail any existing queued message + if (_messageQueue.length > 0) + { + _handleFailure.call(_cometd, undefined, _messageQueue, 'error', 'Disconnected'); + _messageQueue = []; + } + } + + /** + * Sends the initial handshake message + */ + function _handshake(handshakeProps) + { + _clientId = null; + + _clearSubscriptions(); + + // Reset the transports if we're not retrying the handshake + if (_isDisconnected()) + { + _transports.reset(); + _updateAdvice(_config.advice); + } + else + { + // We are retrying the handshake, either because another handshake failed + // and we're backing off, or because the server timed us out and asks us to + // re-handshake: in both cases, make sure that if the handshake succeeds + // the next action is a connect. + _updateAdvice(_cometd._mixin(false, _advice, {reconnect: 'retry'})); + } + + _batch = 0; + + // Mark the start of an internal batch. + // This is needed because handshake and connect are async. + // It may happen that the application calls init() then subscribe() + // and the subscribe message is sent before the connect message, if + // the subscribe message is not held until the connect message is sent. + // So here we start a batch to hold temporarily any message until + // the connection is fully established. + _internalBatch = true; + + // Save the properties provided by the user, so that + // we can reuse them during automatic re-handshake + _handshakeProps = handshakeProps; + + var version = '1.0'; + + // Figure out the transports to send to the server + var transportTypes = _transports.findTransportTypes(version, _crossDomain, _config.url); + + var bayeuxMessage = { + version: version, + minimumVersion: '0.9', + channel: '/meta/handshake', + supportedConnectionTypes: transportTypes, + advice: { + timeout: _advice.timeout, + interval: _advice.interval + } + }; + // Do not allow the user to mess with the required properties, + // so merge first the user properties and *then* the bayeux message + var message = _cometd._mixin(false, {}, _handshakeProps, bayeuxMessage); + + // Pick up the first available transport as initial transport + // since we don't know if the server supports it + _transport = _transports.negotiateTransport(transportTypes, version, _crossDomain, _config.url); + _cometd._debug('Initial transport is', _transport.getType()); + + // We started a batch to hold the application messages, + // so here we must bypass it and send immediately. + _setStatus('handshaking'); + _cometd._debug('Handshake sent', message); + _send(false, [message], false, 'handshake'); + } + + function _delayedHandshake() + { + _setStatus('handshaking'); + + // We will call _handshake() which will reset _clientId, but we want to avoid + // that between the end of this method and the call to _handshake() someone may + // call publish() (or other methods that call _queueSend()). + _internalBatch = true; + + _delayedSend(function() + { + _handshake(_handshakeProps); + }); + } + + function _failHandshake(message) + { + _notifyListeners('/meta/handshake', message); + _notifyListeners('/meta/unsuccessful', message); + + // Only try again if we haven't been disconnected and + // the advice permits us to retry the handshake + var retry = !_isDisconnected() && _advice.reconnect !== 'none'; + if (retry) + { + _increaseBackoff(); + _delayedHandshake(); + } + else + { + _disconnect(false); + } + } + + function _handshakeResponse(message) + { + if (message.successful) + { + // Save clientId, figure out transport, then follow the advice to connect + _clientId = message.clientId; + + var newTransport = _transports.negotiateTransport(message.supportedConnectionTypes, message.version, _crossDomain, _config.url); + if (newTransport === null) + { + throw 'Could not negotiate transport with server; client ' + + _transports.findTransportTypes(message.version, _crossDomain, _config.url) + + ', server ' + message.supportedConnectionTypes; + } + else if (_transport !== newTransport) + { + _cometd._debug('Transport', _transport, '->', newTransport); + _transport = newTransport; + } + + // End the internal batch and allow held messages from the application + // to go to the server (see _handshake() where we start the internal batch). + _internalBatch = false; + _flushBatch(); + + // Here the new transport is in place, as well as the clientId, so + // the listeners can perform a publish() if they want. + // Notify the listeners before the connect below. + message.reestablish = _reestablish; + _reestablish = true; + _notifyListeners('/meta/handshake', message); + + var action = _isDisconnected() ? 'none' : _advice.reconnect; + switch (action) + { + case 'retry': + _resetBackoff(); + _delayedConnect(); + break; + case 'none': + _disconnect(false); + break; + default: + throw 'Unrecognized advice action ' + action; + } + } + else + { + _failHandshake(message); + } + } + + function _handshakeFailure(xhr, message) + { + _failHandshake({ + successful: false, + failure: true, + channel: '/meta/handshake', + request: message, + xhr: xhr, + advice: { + reconnect: 'retry', + interval: _backoff + } + }); + } + + function _failConnect(message) + { + // Notify the listeners after the status change but before the next action + _notifyListeners('/meta/connect', message); + _notifyListeners('/meta/unsuccessful', message); + + // This may happen when the server crashed, the current clientId + // will be invalid, and the server will ask to handshake again + // Listeners can call disconnect(), so check the state after they run + var action = _isDisconnected() ? 'none' : _advice.reconnect; + switch (action) + { + case 'retry': + _delayedConnect(); + _increaseBackoff(); + break; + case 'handshake': + // The current transport may be failed (e.g. network disconnection) + // Reset the transports so the new handshake picks up the right one + _transports.reset(); + _resetBackoff(); + _delayedHandshake(); + break; + case 'none': + _disconnect(false); + break; + default: + throw 'Unrecognized advice action' + action; + } + } + + function _connectResponse(message) + { + _connected = message.successful; + + if (_connected) + { + _notifyListeners('/meta/connect', message); + + // Normally, the advice will say "reconnect: 'retry', interval: 0" + // and the server will hold the request, so when a response returns + // we immediately call the server again (long polling) + // Listeners can call disconnect(), so check the state after they run + var action = _isDisconnected() ? 'none' : _advice.reconnect; + switch (action) + { + case 'retry': + _resetBackoff(); + _delayedConnect(); + break; + case 'none': + _disconnect(false); + break; + default: + throw 'Unrecognized advice action ' + action; + } + } + else + { + _failConnect(message); + } + } + + function _connectFailure(xhr, message) + { + _connected = false; + _failConnect({ + successful: false, + failure: true, + channel: '/meta/connect', + request: message, + xhr: xhr, + advice: { + reconnect: 'retry', + interval: _backoff + } + }); + } + + function _failDisconnect(message) + { + _disconnect(true); + _notifyListeners('/meta/disconnect', message); + _notifyListeners('/meta/unsuccessful', message); + } + + function _disconnectResponse(message) + { + if (message.successful) + { + _disconnect(false); + _notifyListeners('/meta/disconnect', message); + } + else + { + _failDisconnect(message); + } + } + + function _disconnectFailure(xhr, message) + { + _failDisconnect({ + successful: false, + failure: true, + channel: '/meta/disconnect', + request: message, + xhr: xhr, + advice: { + reconnect: 'none', + interval: 0 + } + }); + } + + function _failSubscribe(message) + { + _notifyListeners('/meta/subscribe', message); + _notifyListeners('/meta/unsuccessful', message); + } + + function _subscribeResponse(message) + { + if (message.successful) + { + _notifyListeners('/meta/subscribe', message); + } + else + { + _failSubscribe(message); + } + } + + function _subscribeFailure(xhr, message) + { + _failSubscribe({ + successful: false, + failure: true, + channel: '/meta/subscribe', + request: message, + xhr: xhr, + advice: { + reconnect: 'none', + interval: 0 + } + }); + } + + function _failUnsubscribe(message) + { + _notifyListeners('/meta/unsubscribe', message); + _notifyListeners('/meta/unsuccessful', message); + } + + function _unsubscribeResponse(message) + { + if (message.successful) + { + _notifyListeners('/meta/unsubscribe', message); + } + else + { + _failUnsubscribe(message); + } + } + + function _unsubscribeFailure(xhr, message) + { + _failUnsubscribe({ + successful: false, + failure: true, + channel: '/meta/unsubscribe', + request: message, + xhr: xhr, + advice: { + reconnect: 'none', + interval: 0 + } + }); + } + + function _handlePublishCallback(message) + { + var callback = _publishCallbacks[message.id]; + if (_isFunction(callback)) + { + delete _publishCallbacks[message.id]; + callback.call(_cometd, message); + } + } + + function _failMessage(message) + { + _handlePublishCallback(message); + _notifyListeners('/meta/publish', message); + _notifyListeners('/meta/unsuccessful', message); + } + + function _messageResponse(message) + { + if (message.successful === undefined) + { + if (message.data) + { + // It is a plain message, and not a bayeux meta message + _notifyListeners(message.channel, message); + } + else + { + _cometd._debug('Unknown message', message); + } + } + else + { + if (message.successful) + { + _handlePublishCallback(message); + _notifyListeners('/meta/publish', message); + } + else + { + _failMessage(message); + } + } + } + + function _messageFailure(xhr, message) + { + _failMessage({ + successful: false, + failure: true, + channel: message.channel, + request: message, + xhr: xhr, + advice: { + reconnect: 'none', + interval: 0 + } + }); + } + + function _receive(message) + { + message = _applyIncomingExtensions(message); + if (message === undefined || message === null) + { + return; + } + + _updateAdvice(message.advice); + + var channel = message.channel; + switch (channel) + { + case '/meta/handshake': + _handshakeResponse(message); + break; + case '/meta/connect': + _connectResponse(message); + break; + case '/meta/disconnect': + _disconnectResponse(message); + break; + case '/meta/subscribe': + _subscribeResponse(message); + break; + case '/meta/unsubscribe': + _unsubscribeResponse(message); + break; + default: + _messageResponse(message); + break; + } + } + + /** + * Receives a message. + * This method is exposed as a public so that extensions may inject + * messages simulating that they had been received. + */ + this.receive = _receive; + + _handleMessages = function(rcvdMessages) + { + _cometd._debug('Received', rcvdMessages); + + for (var i = 0; i < rcvdMessages.length; ++i) + { + var message = rcvdMessages[i]; + _receive(message); + } + }; + + _handleFailure = function(conduit, messages, reason, exception) + { + _cometd._debug('handleFailure', conduit, messages, reason, exception); + + for (var i = 0; i < messages.length; ++i) + { + var message = messages[i]; + var channel = message.channel; + switch (channel) + { + case '/meta/handshake': + _handshakeFailure(conduit, message); + break; + case '/meta/connect': + _connectFailure(conduit, message); + break; + case '/meta/disconnect': + _disconnectFailure(conduit, message); + break; + case '/meta/subscribe': + _subscribeFailure(conduit, message); + break; + case '/meta/unsubscribe': + _unsubscribeFailure(conduit, message); + break; + default: + _messageFailure(conduit, message); + break; + } + } + }; + + function _hasSubscriptions(channel) + { + var subscriptions = _listeners[channel]; + if (subscriptions) + { + for (var i = 0; i < subscriptions.length; ++i) + { + if (subscriptions[i]) + { + return true; + } + } + } + return false; + } + + function _resolveScopedCallback(scope, callback) + { + var delegate = { + scope: scope, + method: callback + }; + if (_isFunction(scope)) + { + delegate.scope = undefined; + delegate.method = scope; + } + else + { + if (_isString(callback)) + { + if (!scope) + { + throw 'Invalid scope ' + scope; + } + delegate.method = scope[callback]; + if (!_isFunction(delegate.method)) + { + throw 'Invalid callback ' + callback + ' for scope ' + scope; + } + } + else if (!_isFunction(callback)) + { + throw 'Invalid callback ' + callback; + } + } + return delegate; + } + + function _addListener(channel, scope, callback, isListener) + { + // The data structure is a map
url
+ * of type string containing the URL of the Bayeux server.
+ * @param configuration the configuration object
+ */
+ this.configure = function(configuration)
+ {
+ _configure.call(this, configuration);
+ };
+
+ /**
+ * Configures and establishes the Bayeux communication with the Bayeux server
+ * via a handshake and a subsequent connect.
+ * @param configuration the configuration object
+ * @param handshakeProps an object to be merged with the handshake message
+ * @see #configure(configuration)
+ * @see #handshake(handshakeProps)
+ */
+ this.init = function(configuration, handshakeProps)
+ {
+ this.configure(configuration);
+ this.handshake(handshakeProps);
+ };
+
+ /**
+ * Establishes the Bayeux communication with the Bayeux server
+ * via a handshake and a subsequent connect.
+ * @param handshakeProps an object to be merged with the handshake message
+ */
+ this.handshake = function(handshakeProps)
+ {
+ _setStatus('disconnected');
+ _reestablish = false;
+ _handshake(handshakeProps);
+ };
+
+ /**
+ * Disconnects from the Bayeux server.
+ * It is possible to suggest to attempt a synchronous disconnect, but this feature
+ * may only be available in certain transports (for example, long-polling may support
+ * it, callback-polling certainly does not).
+ * @param sync whether attempt to perform a synchronous disconnect
+ * @param disconnectProps an object to be merged with the disconnect message
+ */
+ this.disconnect = function(sync, disconnectProps)
+ {
+ if (_isDisconnected())
+ {
+ return;
+ }
+
+ if (disconnectProps === undefined)
+ {
+ if (typeof sync !== 'boolean')
+ {
+ disconnectProps = sync;
+ sync = false;
+ }
+ }
+
+ var bayeuxMessage = {
+ channel: '/meta/disconnect'
+ };
+ var message = this._mixin(false, {}, disconnectProps, bayeuxMessage);
+ _setStatus('disconnecting');
+ _send(sync === true, [message], false, 'disconnect');
+ };
+
+ /**
+ * Marks the start of a batch of application messages to be sent to the server
+ * in a single request, obtaining a single response containing (possibly) many
+ * application reply messages.
+ * Messages are held in a queue and not sent until {@link #endBatch()} is called.
+ * If startBatch() is called multiple times, then an equal number of endBatch()
+ * calls must be made to close and send the batch of messages.
+ * @see #endBatch()
+ */
+ this.startBatch = function()
+ {
+ _startBatch();
+ };
+
+ /**
+ * Marks the end of a batch of application messages to be sent to the server
+ * in a single request.
+ * @see #startBatch()
+ */
+ this.endBatch = function()
+ {
+ _endBatch();
+ };
+
+ /**
+ * Executes the given callback in the given scope, surrounded by a {@link #startBatch()}
+ * and {@link #endBatch()} calls.
+ * @param scope the scope of the callback, may be omitted
+ * @param callback the callback to be executed within {@link #startBatch()} and {@link #endBatch()} calls
+ */
+ this.batch = function(scope, callback)
+ {
+ var delegate = _resolveScopedCallback(scope, callback);
+ this.startBatch();
+ try
+ {
+ delegate.method.call(delegate.scope);
+ this.endBatch();
+ }
+ catch (x)
+ {
+ this._debug('Exception during execution of batch', x);
+ this.endBatch();
+ throw x;
+ }
+ };
+
+ /**
+ * Adds a listener for bayeux messages, performing the given callback in the given scope
+ * when a message for the given channel arrives.
+ * @param channel the channel the listener is interested to
+ * @param scope the scope of the callback, may be omitted
+ * @param callback the callback to call when a message is sent to the channel
+ * @returns the subscription handle to be passed to {@link #removeListener(object)}
+ * @see #removeListener(subscription)
+ */
+ this.addListener = function(channel, scope, callback)
+ {
+ if (arguments.length < 2)
+ {
+ throw 'Illegal arguments number: required 2, got ' + arguments.length;
+ }
+ if (!_isString(channel))
+ {
+ throw 'Illegal argument type: channel must be a string';
+ }
+
+ return _addListener(channel, scope, callback, true);
+ };
+
+ /**
+ * Removes the subscription obtained with a call to {@link #addListener(string, object, function)}.
+ * @param subscription the subscription to unsubscribe.
+ * @see #addListener(channel, scope, callback)
+ */
+ this.removeListener = function(subscription)
+ {
+ if (!org.cometd.Utils.isArray(subscription))
+ {
+ throw 'Invalid argument: expected subscription, not ' + subscription;
+ }
+
+ _removeListener(subscription);
+ };
+
+ /**
+ * Removes all listeners registered with {@link #addListener(channel, scope, callback)} or
+ * {@link #subscribe(channel, scope, callback)}.
+ */
+ this.clearListeners = function()
+ {
+ _listeners = {};
+ };
+
+ /**
+ * Subscribes to the given channel, performing the given callback in the given scope
+ * when a message for the channel arrives.
+ * @param channel the channel to subscribe to
+ * @param scope the scope of the callback, may be omitted
+ * @param callback the callback to call when a message is sent to the channel
+ * @param subscribeProps an object to be merged with the subscribe message
+ * @return the subscription handle to be passed to {@link #unsubscribe(object)}
+ */
+ this.subscribe = function(channel, scope, callback, subscribeProps)
+ {
+ if (arguments.length < 2)
+ {
+ throw 'Illegal arguments number: required 2, got ' + arguments.length;
+ }
+ if (!_isString(channel))
+ {
+ throw 'Illegal argument type: channel must be a string';
+ }
+ if (_isDisconnected())
+ {
+ throw 'Illegal state: already disconnected';
+ }
+
+ // Normalize arguments
+ if (_isFunction(scope))
+ {
+ subscribeProps = callback;
+ callback = scope;
+ scope = undefined;
+ }
+
+ // Only send the message to the server if this client has not yet subscribed to the channel
+ var send = !_hasSubscriptions(channel);
+
+ var subscription = _addListener(channel, scope, callback, false);
+
+ if (send)
+ {
+ // Send the subscription message after the subscription registration to avoid
+ // races where the server would send a message to the subscribers, but here
+ // on the client the subscription has not been added yet to the data structures
+ var bayeuxMessage = {
+ channel: '/meta/subscribe',
+ subscription: channel
+ };
+ var message = this._mixin(false, {}, subscribeProps, bayeuxMessage);
+ _queueSend(message);
+ }
+
+ return subscription;
+ };
+
+ /**
+ * Unsubscribes the subscription obtained with a call to {@link #subscribe(string, object, function)}.
+ * @param subscription the subscription to unsubscribe.
+ */
+ this.unsubscribe = function(subscription, unsubscribeProps)
+ {
+ if (arguments.length < 1)
+ {
+ throw 'Illegal arguments number: required 1, got ' + arguments.length;
+ }
+ if (_isDisconnected())
+ {
+ throw 'Illegal state: already disconnected';
+ }
+
+ // Remove the local listener before sending the message
+ // This ensures that if the server fails, this client does not get notifications
+ this.removeListener(subscription);
+
+ var channel = subscription[0];
+ // Only send the message to the server if this client unsubscribes the last subscription
+ if (!_hasSubscriptions(channel))
+ {
+ var bayeuxMessage = {
+ channel: '/meta/unsubscribe',
+ subscription: channel
+ };
+ var message = this._mixin(false, {}, unsubscribeProps, bayeuxMessage);
+ _queueSend(message);
+ }
+ };
+
+ /**
+ * Removes all subscriptions added via {@link #subscribe(channel, scope, callback, subscribeProps)},
+ * but does not remove the listeners added via {@link addListener(channel, scope, callback)}.
+ */
+ this.clearSubscriptions = function()
+ {
+ _clearSubscriptions();
+ };
+
+ /**
+ * Publishes a message on the given channel, containing the given content.
+ * @param channel the channel to publish the message to
+ * @param content the content of the message
+ * @param publishProps an object to be merged with the publish message
+ */
+ this.publish = function(channel, content, publishProps, publishCallback)
+ {
+ if (arguments.length < 1)
+ {
+ throw 'Illegal arguments number: required 1, got ' + arguments.length;
+ }
+ if (!_isString(channel))
+ {
+ throw 'Illegal argument type: channel must be a string';
+ }
+ if (_isDisconnected())
+ {
+ throw 'Illegal state: already disconnected';
+ }
+
+ if (_isFunction(content))
+ {
+ publishCallback = content;
+ content = publishProps = {};
+ }
+ else if (_isFunction(publishProps))
+ {
+ publishCallback = publishProps;
+ publishProps = {};
+ }
+
+ var bayeuxMessage = {
+ channel: channel,
+ data: content,
+ _callback: publishCallback
+ };
+ var message = this._mixin(false, {}, publishProps, bayeuxMessage);
+ _queueSend(message);
+ };
+
+ /**
+ * Returns a string representing the status of the bayeux communication with the Bayeux server.
+ */
+ this.getStatus = function()
+ {
+ return _status;
+ };
+
+ /**
+ * Returns whether this instance has been disconnected.
+ */
+ this.isDisconnected = _isDisconnected;
+
+ /**
+ * Sets the backoff period used to increase the backoff time when retrying an unsuccessful or failed message.
+ * Default value is 1 second, which means if there is a persistent failure the retries will happen
+ * after 1 second, then after 2 seconds, then after 3 seconds, etc. So for example with 15 seconds of
+ * elapsed time, there will be 5 retries (at 1, 3, 6, 10 and 15 seconds elapsed).
+ * @param period the backoff period to set
+ * @see #getBackoffIncrement()
+ */
+ this.setBackoffIncrement = function(period)
+ {
+ _config.backoffIncrement = period;
+ };
+
+ /**
+ * Returns the backoff period used to increase the backoff time when retrying an unsuccessful or failed message.
+ * @see #setBackoffIncrement(period)
+ */
+ this.getBackoffIncrement = function()
+ {
+ return _config.backoffIncrement;
+ };
+
+ /**
+ * Returns the backoff period to wait before retrying an unsuccessful or failed message.
+ */
+ this.getBackoffPeriod = function()
+ {
+ return _backoff;
+ };
+
+ /**
+ * Sets the log level for console logging.
+ * Valid values are the strings 'error', 'warn', 'info' and 'debug', from
+ * less verbose to more verbose.
+ * @param level the log level string
+ */
+ this.setLogLevel = function(level)
+ {
+ _config.logLevel = level;
+ };
+
+ /**
+ * Registers an extension whose callbacks are called for every incoming message
+ * (that comes from the server to this client implementation) and for every
+ * outgoing message (that originates from this client implementation for the
+ * server).
+ * The format of the extension object is the following:
+ * + * { + * incoming: function(message) { ... }, + * outgoing: function(message) { ... } + * } + *+ * Both properties are optional, but if they are present they will be called + * respectively for each incoming message and for each outgoing message. + * @param name the name of the extension + * @param extension the extension to register + * @return true if the extension was registered, false otherwise + * @see #unregisterExtension(name) + */ + this.registerExtension = function(name, extension) + { + if (arguments.length < 2) + { + throw 'Illegal arguments number: required 2, got ' + arguments.length; + } + if (!_isString(name)) + { + throw 'Illegal argument type: extension name must be a string'; + } + + var existing = false; + for (var i = 0; i < _extensions.length; ++i) + { + var existingExtension = _extensions[i]; + if (existingExtension.name === name) + { + existing = true; + break; + } + } + if (!existing) + { + _extensions.push({ + name: name, + extension: extension + }); + this._debug('Registered extension', name); + + // Callback for extensions + if (_isFunction(extension.registered)) + { + extension.registered(name, this); + } + + return true; + } + else + { + this._info('Could not register extension with name', name, 'since another extension with the same name already exists'); + return false; + } + }; + + /** + * Unregister an extension previously registered with + * {@link #registerExtension(name, extension)}. + * @param name the name of the extension to unregister. + * @return true if the extension was unregistered, false otherwise + */ + this.unregisterExtension = function(name) + { + if (!_isString(name)) + { + throw 'Illegal argument type: extension name must be a string'; + } + + var unregistered = false; + for (var i = 0; i < _extensions.length; ++i) + { + var extension = _extensions[i]; + if (extension.name === name) + { + _extensions.splice(i, 1); + unregistered = true; + this._debug('Unregistered extension', name); + + // Callback for extensions + var ext = extension.extension; + if (_isFunction(ext.unregistered)) + { + ext.unregistered(); + } + + break; + } + } + return unregistered; + }; + + /** + * Find the extension registered with the given name. + * @param name the name of the extension to find + * @return the extension found or null if no extension with the given name has been registered + */ + this.getExtension = function(name) + { + for (var i = 0; i < _extensions.length; ++i) + { + var extension = _extensions[i]; + if (extension.name === name) + { + return extension.extension; + } + } + return null; + }; + + /** + * Returns the name assigned to this Cometd object, or the string 'default' + * if no name has been explicitly passed as parameter to the constructor. + */ + this.getName = function() + { + return _name; + }; + + /** + * Returns the clientId assigned by the Bayeux server during handshake. + */ + this.getClientId = function() + { + return _clientId; + }; + + /** + * Returns the URL of the Bayeux server. + */ + this.getURL = function() + { + return _config.url; + }; + + this.getTransport = function() + { + return _transport; + }; + + this.getConfiguration = function() + { + return this._mixin(true, {}, _config); + }; + + this.getAdvice = function() + { + return this._mixin(true, {}, _advice); + }; + + // WebSocket handling for Firefox, which deploys WebSocket + // under the name of MozWebSocket in Firefox 6, 7, 8 and 9 + org.cometd.WebSocket = window.WebSocket; + if (!org.cometd.WebSocket) + { + org.cometd.WebSocket = window.MozWebSocket; + } +}; + +if (typeof define === 'function' && define.amd) +{ + define(function() + { + return org.cometd; + }); +} + -- cgit 1.2.3-korg