/* * Copyright (c) 2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Namespaces for the cometd implementation this.org = this.org || {}; org.cometd = {}; org.cometd.JSON = {}; org.cometd.JSON.toJSON = org.cometd.JSON.fromJSON = function(object) { throw 'Abstract'; }; org.cometd.Utils = {}; org.cometd.Utils.isString = function(value) { if (value === undefined || value === null) { return false; } return typeof value === 'string' || value instanceof String; }; org.cometd.Utils.isArray = function(value) { if (value === undefined || value === null) { return false; } return value instanceof Array; }; /** * Returns whether the given element is contained into the given array. * @param element the element to check presence for * @param array the array to check for the element presence * @return the index of the element, if present, or a negative index if the element is not present */ org.cometd.Utils.inArray = function(element, array) { for (var i = 0; i < array.length; ++i) { if (element === array[i]) { return i; } } return -1; }; org.cometd.Utils.setTimeout = function(cometd, funktion, delay) { return window.setTimeout(function() { try { funktion(); } catch (x) { cometd._debug('Exception invoking timed function', funktion, x); } }, delay); }; org.cometd.Utils.clearTimeout = function(timeoutHandle) { window.clearTimeout(timeoutHandle); }; /** * A registry for transports used by the Cometd object. */ org.cometd.TransportRegistry = function() { var _types = []; var _transports = {}; this.getTransportTypes = function() { return _types.slice(0); }; this.findTransportTypes = function(version, crossDomain, url) { var result = []; for (var i = 0; i < _types.length; ++i) { var type = _types[i]; if (_transports[type].accept(version, crossDomain, url) === true) { result.push(type); } } return result; }; this.negotiateTransport = function(types, version, crossDomain, url) { for (var i = 0; i < _types.length; ++i) { var type = _types[i]; for (var j = 0; j < types.length; ++j) { if (type === types[j]) { var transport = _transports[type]; if (transport.accept(version, crossDomain, url) === true) { return transport; } } } } return null; }; this.add = function(type, transport, index) { var existing = false; for (var i = 0; i < _types.length; ++i) { if (_types[i] === type) { existing = true; break; } } if (!existing) { if (typeof index !== 'number') { _types.push(type); } else { _types.splice(index, 0, type); } _transports[type] = transport; } return !existing; }; this.find = function(type) { for (var i = 0; i < _types.length; ++i) { if (_types[i] === type) { return _transports[type]; } } return null; }; this.remove = function(type) { for (var i = 0; i < _types.length; ++i) { if (_types[i] === type) { _types.splice(i, 1); var transport = _transports[type]; delete _transports[type]; return transport; } } return null; }; this.clear = function() { _types = []; _transports = {}; }; this.reset = function() { for (var i = 0; i < _types.length; ++i) { _transports[_types[i]].reset(); } }; }; /** * Base object with the common functionality for transports. */ org.cometd.Transport = function() { var _type; var _cometd; /** * Function invoked just after a transport has been successfully registered. * @param type the type of transport (for example 'long-polling') * @param cometd the cometd object this transport has been registered to * @see #unregistered() */ this.registered = function(type, cometd) { _type = type; _cometd = cometd; }; /** * Function invoked just after a transport has been successfully unregistered. * @see #registered(type, cometd) */ this.unregistered = function() { _type = null; _cometd = null; }; this._debug = function() { _cometd._debug.apply(_cometd, arguments); }; this._mixin = function() { return _cometd._mixin.apply(_cometd, arguments); }; this.getConfiguration = function() { return _cometd.getConfiguration(); }; this.getAdvice = function() { return _cometd.getAdvice(); }; this.setTimeout = function(funktion, delay) { return org.cometd.Utils.setTimeout(_cometd, funktion, delay); }; this.clearTimeout = function(handle) { org.cometd.Utils.clearTimeout(handle); }; /** * Converts the given response into an array of bayeux messages * @param response the response to convert * @return an array of bayeux messages obtained by converting the response */ this.convertToMessages = function (response) { if (org.cometd.Utils.isString(response)) { try { return org.cometd.JSON.fromJSON(response); } catch(x) { this._debug('Could not convert to JSON the following string', '"' + response + '"'); throw x; } } if (org.cometd.Utils.isArray(response)) { return response; } if (response === undefined || response === null) { return []; } if (response instanceof Object) { return [response]; } throw 'Conversion Error ' + response + ', typeof ' + (typeof response); }; /** * Returns whether this transport can work for the given version and cross domain communication case. * @param version a string indicating the transport version * @param crossDomain a boolean indicating whether the communication is cross domain * @return true if this transport can work for the given version and cross domain communication case, * false otherwise */ this.accept = function(version, crossDomain, url) { throw 'Abstract'; }; /** * Returns the type of this transport. * @see #registered(type, cometd) */ this.getType = function() { return _type; }; this.send = function(envelope, metaConnect) { throw 'Abstract'; }; this.reset = function() { this._debug('Transport', _type, 'reset'); }; this.abort = function() { this._debug('Transport', _type, 'aborted'); }; this.toString = function() { return this.getType(); }; }; org.cometd.Transport.derive = function(baseObject) { function F() {} F.prototype = baseObject; return new F(); }; /** * Base object with the common functionality for transports based on requests. * The key responsibility is to allow at most 2 outstanding requests to the server, * to avoid that requests are sent behind a long poll. * To achieve this, we have one reserved request for the long poll, and all other * requests are serialized one after the other. */ org.cometd.RequestTransport = function() { var _super = new org.cometd.Transport(); var _self = org.cometd.Transport.derive(_super); var _requestIds = 0; var _metaConnectRequest = null; var _requests = []; var _envelopes = []; function _coalesceEnvelopes(envelope) { while (_envelopes.length > 0) { var envelopeAndRequest = _envelopes[0]; var newEnvelope = envelopeAndRequest[0]; var newRequest = envelopeAndRequest[1]; if (newEnvelope.url === envelope.url && newEnvelope.sync === envelope.sync) { _envelopes.shift(); envelope.messages = envelope.messages.concat(newEnvelope.messages); this._debug('Coalesced', newEnvelope.messages.length, 'messages from request', newRequest.id); continue; } break; } } function _transportSend(envelope, request) { this.transportSend(envelope, request); request.expired = false; if (!envelope.sync) { var maxDelay = this.getConfiguration().maxNetworkDelay; var delay = maxDelay; if (request.metaConnect === true) { delay += this.getAdvice().timeout; } this._debug('Transport', this.getType(), 'waiting at most', delay, 'ms for the response, maxNetworkDelay', maxDelay); var self = this; request.timeout = this.setTimeout(function() { request.expired = true; if (request.xhr) { request.xhr.abort(); } var errorMessage = 'Request ' + request.id + ' of transport ' + self.getType() + ' exceeded ' + delay + ' ms max network delay'; self._debug(errorMessage); self.complete(request, false, request.metaConnect); envelope.onFailure(request.xhr, envelope.messages, 'timeout', errorMessage); }, delay); } } function _queueSend(envelope) { var requestId = ++_requestIds; var request = { id: requestId, metaConnect: false }; // Consider the metaConnect requests which should always be present if (_requests.length < this.getConfiguration().maxConnections - 1) { _requests.push(request); _transportSend.call(this, envelope, request); } else { this._debug('Transport', this.getType(), 'queueing request', requestId, 'envelope', envelope); _envelopes.push([envelope, request]); } } function _metaConnectComplete(request) { var requestId = request.id; this._debug('Transport', this.getType(), 'metaConnect complete, request', requestId); if (_metaConnectRequest !== null && _metaConnectRequest.id !== requestId) { throw 'Longpoll request mismatch, completing request ' + requestId; } // Reset metaConnect request _metaConnectRequest = null; } function _complete(request, success) { var index = org.cometd.Utils.inArray(request, _requests); // The index can be negative if the request has been aborted if (index >= 0) { _requests.splice(index, 1); } if (_envelopes.length > 0) { var envelopeAndRequest = _envelopes.shift(); var nextEnvelope = envelopeAndRequest[0]; var nextRequest = envelopeAndRequest[1]; this._debug('Transport dequeued request', nextRequest.id); if (success) { if (this.getConfiguration().autoBatch) { _coalesceEnvelopes.call(this, nextEnvelope); } _queueSend.call(this, nextEnvelope); this._debug('Transport completed request', request.id, nextEnvelope); } else { // Keep the semantic of calling response callbacks asynchronously after the request var self = this; this.setTimeout(function() { self.complete(nextRequest, false, nextRequest.metaConnect); nextEnvelope.onFailure(nextRequest.xhr, nextEnvelope.messages, 'error', 'Previous request failed'); }, 0); } } } _self.complete = function(request, success, metaConnect) { if (metaConnect) { _metaConnectComplete.call(this, request); } else { _complete.call(this, request, success); } }; /** * Performs the actual send depending on the transport type details. * @param envelope the envelope to send * @param request the request information */ _self.transportSend = function(envelope, request) { throw 'Abstract'; }; _self.transportSuccess = function(envelope, request, responses) { if (!request.expired) { this.clearTimeout(request.timeout); this.complete(request, true, request.metaConnect); if (responses && responses.length > 0) { envelope.onSuccess(responses); } else { envelope.onFailure(request.xhr, envelope.messages, 'Empty HTTP response'); } } }; _self.transportFailure = function(envelope, request, reason, exception) { if (!request.expired) { this.clearTimeout(request.timeout); this.complete(request, false, request.metaConnect); envelope.onFailure(request.xhr, envelope.messages, reason, exception); } }; function _metaConnectSend(envelope) { if (_metaConnectRequest !== null) { throw 'Concurrent metaConnect requests not allowed, request id=' + _metaConnectRequest.id + ' not yet completed'; } var requestId = ++_requestIds; this._debug('Transport', this.getType(), 'metaConnect send, request', requestId, 'envelope', envelope); var request = { id: requestId, metaConnect: true }; _transportSend.call(this, envelope, request); _metaConnectRequest = request; } _self.send = function(envelope, metaConnect) { if (metaConnect) { _metaConnectSend.call(this, envelope); } else { _queueSend.call(this, envelope); } }; _self.abort = function() { _super.abort(); for (var i = 0; i < _requests.length; ++i) { var request = _requests[i]; this._debug('Aborting request', request); if (request.xhr) { request.xhr.abort(); } } if (_metaConnectRequest) { this._debug('Aborting metaConnect request', _metaConnectRequest); if (_metaConnectRequest.xhr) { _metaConnectRequest.xhr.abort(); } } this.reset(); }; _self.reset = function() { _super.reset(); _metaConnectRequest = null; _requests = []; _envelopes = []; }; return _self; }; org.cometd.LongPollingTransport = function() { var _super = new org.cometd.RequestTransport(); var _self = org.cometd.Transport.derive(_super); // By default, support cross domain var _supportsCrossDomain = true; _self.accept = function(version, crossDomain, url) { return _supportsCrossDomain || !crossDomain; }; _self.xhrSend = function(packet) { throw 'Abstract'; }; _self.transportSend = function(envelope, request) { this._debug('Transport', this.getType(), 'sending request', request.id, 'envelope', envelope); var self = this; try { var sameStack = true; request.xhr = this.xhrSend({ transport: this, url: envelope.url, sync: envelope.sync, headers: this.getConfiguration().requestHeaders, body: org.cometd.JSON.toJSON(envelope.messages), onSuccess: function(response) { self._debug('Transport', self.getType(), 'received response', response); var success = false; try { var received = self.convertToMessages(response); if (received.length === 0) { _supportsCrossDomain = false; self.transportFailure(envelope, request, 'no response', null); } else { success = true; self.transportSuccess(envelope, request, received); } } catch(x) { self._debug(x); if (!success) { _supportsCrossDomain = false; self.transportFailure(envelope, request, 'bad response', x); } } }, onError: function(reason, exception) { _supportsCrossDomain = false; if (sameStack) { // Keep the semantic of calling response callbacks asynchronously after the request self.setTimeout(function() { self.transportFailure(envelope, request, reason, exception); }, 0); } else { self.transportFailure(envelope, request, reason, exception); } } }); sameStack = false; } catch (x) { _supportsCrossDomain = false; // Keep the semantic of calling response callbacks asynchronously after the request this.setTimeout(function() { self.transportFailure(envelope, request, 'error', x); }, 0); } }; _self.reset = function() { _super.reset(); _supportsCrossDomain = true; }; return _self; }; org.cometd.CallbackPollingTransport = function() { var _super = new org.cometd.RequestTransport(); var _self = org.cometd.Transport.derive(_super); var _maxLength = 2000; _self.accept = function(version, crossDomain, url) { return true; }; _self.jsonpSend = function(packet) { throw 'Abstract'; }; _self.transportSend = function(envelope, request) { var self = this; // Microsoft Internet Explorer has a 2083 URL max length // We must ensure that we stay within that length var start = 0; var length = envelope.messages.length; var lengths = []; while (length > 0) { // Encode the messages because all brackets, quotes, commas, colons, etc // present in the JSON will be URL encoded, taking many more characters var json = org.cometd.JSON.toJSON(envelope.messages.slice(start, start + length)); var urlLength = envelope.url.length + encodeURI(json).length; // Let's stay on the safe side and use 2000 instead of 2083 // also because we did not count few characters among which // the parameter name 'message' and the parameter 'jsonp', // which sum up to about 50 chars if (urlLength > _maxLength) { if (length === 1) { var x = 'Bayeux message too big (' + urlLength + ' bytes, max is ' + _maxLength + ') ' + 'for transport ' + this.getType(); // Keep the semantic of calling response callbacks asynchronously after the request this.setTimeout(function() { self.transportFailure(envelope, request, 'error', x); }, 0); return; } --length; continue; } lengths.push(length); start += length; length = envelope.messages.length - start; } // Here we are sure that the messages can be sent within the URL limit var envelopeToSend = envelope; if (lengths.length > 1) { var begin = 0; var end = lengths[0]; this._debug('Transport', this.getType(), 'split', envelope.messages.length, 'messages into', lengths.join(' + ')); envelopeToSend = this._mixin(false, {}, envelope); envelopeToSend.messages = envelope.messages.slice(begin, end); envelopeToSend.onSuccess = envelope.onSuccess; envelopeToSend.onFailure = envelope.onFailure; for (var i = 1; i < lengths.length; ++i) { var nextEnvelope = this._mixin(false, {}, envelope); begin = end; end += lengths[i]; nextEnvelope.messages = envelope.messages.slice(begin, end); nextEnvelope.onSuccess = envelope.onSuccess; nextEnvelope.onFailure = envelope.onFailure; this.send(nextEnvelope, request.metaConnect); } } this._debug('Transport', this.getType(), 'sending request', request.id, 'envelope', envelopeToSend); try { var sameStack = true; this.jsonpSend({ transport: this, url: envelopeToSend.url, sync: envelopeToSend.sync, headers: this.getConfiguration().requestHeaders, body: org.cometd.JSON.toJSON(envelopeToSend.messages), onSuccess: function(responses) { var success = false; try { var received = self.convertToMessages(responses); if (received.length === 0) { self.transportFailure(envelopeToSend, request, 'no response'); } else { success=true; self.transportSuccess(envelopeToSend, request, received); } } catch (x) { self._debug(x); if (!success) { self.transportFailure(envelopeToSend, request, 'bad response', x); } } }, onError: function(reason, exception) { if (sameStack) { // Keep the semantic of calling response callbacks asynchronously after the request self.setTimeout(function() { self.transportFailure(envelopeToSend, request, reason, exception); }, 0); } else { self.transportFailure(envelopeToSend, request, reason, exception); } } }); sameStack = false; } catch (xx) { // Keep the semantic of calling response callbacks asynchronously after the request this.setTimeout(function() { self.transportFailure(envelopeToSend, request, 'error', xx); }, 0); } }; return _self; }; org.cometd.WebSocketTransport = function() { var _super = new org.cometd.Transport(); var _self = org.cometd.Transport.derive(_super); var _cometd; // By default, support WebSocket var _supportsWebSocket = true; // Whether we were able to establish a WebSocket connection var _webSocketSupported = false; // Envelopes that have been sent var _envelopes = {}; // Timeouts for messages that have been sent var _timeouts = {}; var _webSocket = null; var _opened = false; var _connected = false; var _successCallback; function _websocketConnect() { // Mangle the URL, changing the scheme from 'http' to 'ws' var url = _cometd.getURL().replace(/^http/, 'ws'); this._debug('Transport', this.getType(), 'connecting to URL', url); var self = this; var connectTimer = null; var connectTimeout = _cometd.getConfiguration().connectTimeout; if (connectTimeout > 0) { connectTimer = this.setTimeout(function() { connectTimer = null; if (!_opened) { self._debug('Transport', self.getType(), 'timed out while connecting to URL', url, ':', connectTimeout, 'ms'); self.onClose(1002, 'Connect Timeout'); } }, connectTimeout); } var webSocket = new org.cometd.WebSocket(url); var onopen = function() { self._debug('WebSocket opened', webSocket); if (connectTimer) { self.clearTimeout(connectTimer); connectTimer = null; } if (webSocket !== _webSocket) { // It's possible that the onopen callback is invoked // with a delay so that we have already reconnected self._debug('Ignoring open event, WebSocket', _webSocket); return; } self.onOpen(); }; var onclose = function(event) { var code = event ? event.code : 1000; var reason = event ? event.reason : undefined; self._debug('WebSocket closed', code, '/', reason, webSocket); if (connectTimer) { self.clearTimeout(connectTimer); connectTimer = null; } if (webSocket !== _webSocket) { // The onclose callback may be invoked when the server sends // the close message reply, but after we have already reconnected self._debug('Ignoring close event, WebSocket', _webSocket); return; } self.onClose(code, reason); }; var onmessage = function(message) { self._debug('WebSocket message', message, webSocket); if (webSocket !== _webSocket) { self._debug('Ignoring message event, WebSocket', _webSocket); return; } self.onMessage(message); }; webSocket.onopen = onopen; webSocket.onclose = onclose; webSocket.onerror = function() { onclose({ code: 1002 }); }; webSocket.onmessage = onmessage; _webSocket = webSocket; this._debug('Transport', this.getType(), 'configured callbacks on', webSocket); } function _webSocketSend(envelope, metaConnect) { var json = org.cometd.JSON.toJSON(envelope.messages); _webSocket.send(json); this._debug('Transport', this.getType(), 'sent', envelope, 'metaConnect =', metaConnect); // Manage the timeout waiting for the response var maxDelay = this.getConfiguration().maxNetworkDelay; var delay = maxDelay; if (metaConnect) { delay += this.getAdvice().timeout; _connected = true; } var messageIds = []; for (var i = 0; i < envelope.messages.length; ++i) { var message = envelope.messages[i]; if (message.id) { messageIds.push(message.id); var self = this; var webSocket = _webSocket; _timeouts[message.id] = this.setTimeout(function() { if (webSocket) { webSocket.close(1000, 'Timeout'); } }, delay); } } this._debug('Transport', this.getType(), 'waiting at most', delay, 'ms for messages', messageIds, 'maxNetworkDelay', maxDelay, ', timeouts:', _timeouts); } function _send(envelope, metaConnect) { try { if (_webSocket === null) { _websocketConnect.call(this); } // We may have a non-null _webSocket, but not be open yet so // to avoid out of order deliveries, we check if we are open else if (_opened) { _webSocketSend.call(this, envelope, metaConnect); } } catch (x) { // Keep the semantic of calling response callbacks asynchronously after the request var webSocket = _webSocket; this.setTimeout(function() { envelope.onFailure(webSocket, envelope.messages, 'error', x); }, 0); } } _self.onOpen = function() { this._debug('Transport', this.getType(), 'opened', _webSocket); _opened = true; _webSocketSupported = true; this._debug('Sending pending messages', _envelopes); for (var key in _envelopes) { var element = _envelopes[key]; var envelope = element[0]; var metaConnect = element[1]; // Store the success callback, which is independent from the envelope, // so that it can be used to notify arrival of messages. _successCallback = envelope.onSuccess; _webSocketSend.call(this, envelope, metaConnect); } }; _self.onMessage = function(wsMessage) { this._debug('Transport', this.getType(), 'received websocket message', wsMessage, _webSocket); var close = false; var messages = this.convertToMessages(wsMessage.data); var messageIds = []; for (var i = 0; i < messages.length; ++i) { var message = messages[i]; // Detect if the message is a response to a request we made. // If it's a meta message, for sure it's a response; // otherwise it's a publish message and publish responses lack the data field if (/^\/meta\//.test(message.channel) || message.data === undefined) { if (message.id) { messageIds.push(message.id); var timeout = _timeouts[message.id]; if (timeout) { this.clearTimeout(timeout); delete _timeouts[message.id]; this._debug('Transport', this.getType(), 'removed timeout for message', message.id, ', timeouts', _timeouts); } } } if ('/meta/connect' === message.channel) { _connected = false; } if ('/meta/disconnect' === message.channel && !_connected) { close = true; } } // Remove the envelope corresponding to the messages var removed = false; for (var j = 0; j < messageIds.length; ++j) { var id = messageIds[j]; for (var key in _envelopes) { var ids = key.split(','); var index = org.cometd.Utils.inArray(id, ids); if (index >= 0) { removed = true; ids.splice(index, 1); var envelope = _envelopes[key][0]; var metaConnect = _envelopes[key][1]; delete _envelopes[key]; if (ids.length > 0) { _envelopes[ids.join(',')] = [envelope, metaConnect]; } break; } } } if (removed) { this._debug('Transport', this.getType(), 'removed envelope, envelopes', _envelopes); } _successCallback.call(this, messages); if (close) { _webSocket.close(1000, 'Disconnect'); } }; _self.onClose = function(code, reason) { this._debug('Transport', this.getType(), 'closed', code, reason, _webSocket); // Remember if we were able to connect // This close event could be due to server shutdown, and if it restarts we want to try websocket again _supportsWebSocket = _webSocketSupported; for (var id in _timeouts) { this.clearTimeout(_timeouts[id]); } _timeouts = {}; for (var key in _envelopes) { var envelope = _envelopes[key][0]; var metaConnect = _envelopes[key][1]; if (metaConnect) { _connected = false; } envelope.onFailure(_webSocket, envelope.messages, 'closed ' + code + '/' + reason); } _envelopes = {}; if (_webSocket !== null && _opened) { _webSocket.close(1000, 'Close'); } _opened = false; _webSocket = null; }; _self.registered = function(type, cometd) { _super.registered(type, cometd); _cometd = cometd; }; _self.accept = function(version, crossDomain, url) { // Using !! to return a boolean (and not the WebSocket object) return _supportsWebSocket && !!org.cometd.WebSocket && _cometd.websocketEnabled !== false; }; _self.send = function(envelope, metaConnect) { this._debug('Transport', this.getType(), 'sending', envelope, 'metaConnect =', metaConnect); // Store the envelope in any case; if the websocket cannot be opened, we fail it in close() var messageIds = []; for (var i = 0; i < envelope.messages.length; ++i) { var message = envelope.messages[i]; if (message.id) { messageIds.push(message.id); } } _envelopes[messageIds.join(',')] = [envelope, metaConnect]; this._debug('Transport', this.getType(), 'stored envelope, envelopes', _envelopes); _send.call(this, envelope, metaConnect); }; _self.abort = function() { _super.abort(); if (_webSocket !== null) { try { _webSocket.close(1001); } catch (x) { // Firefox may throw, just ignore this._debug(x); } } this.reset(); }; _self.reset = function() { _super.reset(); if (_webSocket !== null && _opened) { _webSocket.close(1000, 'Reset'); } _supportsWebSocket = true; _webSocketSupported = false; _timeouts = {}; _envelopes = {}; _webSocket = null; _opened = false; _successCallback = null; }; return _self; }; /** * The constructor for a Cometd object, identified by an optional name. * The default name is the string 'default'. * In the rare case a page needs more than one Bayeux conversation, * a new instance can be created via: *
* var bayeuxUrl2 = ...; * * // Dojo style * var cometd2 = new dojox.Cometd('another_optional_name'); * * // jQuery style * var cometd2 = new $.Cometd('another_optional_name'); * * cometd2.init({url: bayeuxUrl2}); ** @param name the optional name of this cometd object */ // IMPLEMENTATION NOTES: // Be very careful in not changing the function order and pass this file every time through JSLint (http://jslint.com) // The only implied globals must be "dojo", "org" and "window", and check that there are no "unused" warnings // Failing to pass JSLint may result in shrinkers/minifiers to create an unusable file. org.cometd.Cometd = function(name) { var _cometd = this; var _name = name || 'default'; var _crossDomain = false; var _transports = new org.cometd.TransportRegistry(); var _transport; var _status = 'disconnected'; var _messageId = 0; var _clientId = null; var _batch = 0; var _messageQueue = []; var _internalBatch = false; var _listeners = {}; var _backoff = 0; var _scheduledSend = null; var _extensions = []; var _advice = {}; var _handshakeProps; var _publishCallbacks = {}; var _reestablish = false; var _connected = false; var _config = { connectTimeout: 0, maxConnections: 2, backoffIncrement: 1000, maxBackoff: 60000, logLevel: 'info', reverseIncomingExtensions: true, maxNetworkDelay: 10000, requestHeaders: {}, appendMessageTypeToURL: true, autoBatch: false, advice: { timeout: 60000, interval: 0, reconnect: 'retry' } }; /** * Mixes in the given objects into the target object by copying the properties. * @param deep if the copy must be deep * @param target the target object * @param objects the objects whose properties are copied into the target */ this._mixin = function(deep, target, objects) { var result = target || {}; // Skip first 2 parameters (deep and target), and loop over the others for (var i = 2; i < arguments.length; ++i) { var object = arguments[i]; if (object === undefined || object === null) { continue; } for (var propName in object) { var prop = object[propName]; var targ = result[propName]; // Avoid infinite loops if (prop === target) { continue; } // Do not mixin undefined values if (prop === undefined) { continue; } if (deep && typeof prop === 'object' && prop !== null) { if (prop instanceof Array) { result[propName] = this._mixin(deep, targ instanceof Array ? targ : [], prop); } else { var source = typeof targ === 'object' && !(targ instanceof Array) ? targ : {}; result[propName] = this._mixin(deep, source, prop); } } else { result[propName] = prop; } } } return result; }; function _isString(value) { return org.cometd.Utils.isString(value); } function _isFunction(value) { if (value === undefined || value === null) { return false; } return typeof value === 'function'; } function _log(level, args) { if (window.console) { var logger = window.console[level]; if (_isFunction(logger)) { logger.apply(window.console, args); } } } this._warn = function() { _log('warn', arguments); }; this._info = function() { if (_config.logLevel !== 'warn') { _log('info', arguments); } }; this._debug = function() { if (_config.logLevel === 'debug') { _log('debug', arguments); } }; /** * Returns whether the given hostAndPort is cross domain. * The default implementation checks against window.location.host * but this function can be overridden to make it work in non-browser * environments. * * @param hostAndPort the host and port in format host:port * @return whether the given hostAndPort is cross domain */ this._isCrossDomain = function(hostAndPort) { return hostAndPort && hostAndPort !== window.location.host; }; function _configure(configuration) { _cometd._debug('Configuring cometd object with', configuration); // Support old style param, where only the Bayeux server URL was passed if (_isString(configuration)) { configuration = { url: configuration }; } if (!configuration) { configuration = {}; } _config = _cometd._mixin(false, _config, configuration); if (!_config.url) { throw 'Missing required configuration parameter \'url\' specifying the Bayeux server URL'; } // Check if we're cross domain // [1] = protocol://, [2] = host:port, [3] = host, [4] = IPv6_host, [5] = IPv4_host, [6] = :port, [7] = port, [8] = uri, [9] = rest var urlParts = /(^https?:\/\/)?(((\[[^\]]+\])|([^:\/\?#]+))(:(\d+))?)?([^\?#]*)(.*)?/.exec(_config.url); var hostAndPort = urlParts[2]; var uri = urlParts[8]; var afterURI = urlParts[9]; _crossDomain = _cometd._isCrossDomain(hostAndPort); // Check if appending extra path is supported if (_config.appendMessageTypeToURL) { if (afterURI !== undefined && afterURI.length > 0) { _cometd._info('Appending message type to URI ' + uri + afterURI + ' is not supported, disabling \'appendMessageTypeToURL\' configuration'); _config.appendMessageTypeToURL = false; } else { var uriSegments = uri.split('/'); var lastSegmentIndex = uriSegments.length - 1; if (uri.match(/\/$/)) { lastSegmentIndex -= 1; } if (uriSegments[lastSegmentIndex].indexOf('.') >= 0) { // Very likely the CometD servlet's URL pattern is mapped to an extension, such as *.cometd // It will be difficult to add the extra path in this case _cometd._info('Appending message type to URI ' + uri + ' is not supported, disabling \'appendMessageTypeToURL\' configuration'); _config.appendMessageTypeToURL = false; } } } } function _clearSubscriptions() { for (var channel in _listeners) { var subscriptions = _listeners[channel]; for (var i = 0; i < subscriptions.length; ++i) { var subscription = subscriptions[i]; if (subscription && !subscription.listener) { delete subscriptions[i]; _cometd._debug('Removed subscription', subscription, 'for channel', channel); } } } } function _setStatus(newStatus) { if (_status !== newStatus) { _cometd._debug('Status', _status, '->', newStatus); _status = newStatus; } } function _isDisconnected() { return _status === 'disconnecting' || _status === 'disconnected'; } function _nextMessageId() { return ++_messageId; } function _applyExtension(scope, callback, name, message, outgoing) { try { return callback.call(scope, message); } catch (x) { _cometd._debug('Exception during execution of extension', name, x); var exceptionCallback = _cometd.onExtensionException; if (_isFunction(exceptionCallback)) { _cometd._debug('Invoking extension exception callback', name, x); try { exceptionCallback.call(_cometd, x, name, outgoing, message); } catch(xx) { _cometd._info('Exception during execution of exception callback in extension', name, xx); } } return message; } } function _applyIncomingExtensions(message) { for (var i = 0; i < _extensions.length; ++i) { if (message === undefined || message === null) { break; } var index = _config.reverseIncomingExtensions ? _extensions.length - 1 - i : i; var extension = _extensions[index]; var callback = extension.extension.incoming; if (_isFunction(callback)) { var result = _applyExtension(extension.extension, callback, extension.name, message, false); message = result === undefined ? message : result; } } return message; } function _applyOutgoingExtensions(message) { for (var i = 0; i < _extensions.length; ++i) { if (message === undefined || message === null) { break; } var extension = _extensions[i]; var callback = extension.extension.outgoing; if (_isFunction(callback)) { var result = _applyExtension(extension.extension, callback, extension.name, message, true); message = result === undefined ? message : result; } } return message; } function _notify(channel, message) { var subscriptions = _listeners[channel]; if (subscriptions && subscriptions.length > 0) { for (var i = 0; i < subscriptions.length; ++i) { var subscription = subscriptions[i]; // Subscriptions may come and go, so the array may have 'holes' if (subscription) { try { subscription.callback.call(subscription.scope, message); } catch (x) { _cometd._debug('Exception during notification', subscription, message, x); var listenerCallback = _cometd.onListenerException; if (_isFunction(listenerCallback)) { _cometd._debug('Invoking listener exception callback', subscription, x); try { listenerCallback.call(_cometd, x, subscription.handle, subscription.listener, message); } catch (xx) { _cometd._info('Exception during execution of listener callback', subscription, xx); } } } } } } } function _notifyListeners(channel, message) { // Notify direct listeners _notify(channel, message); // Notify the globbing listeners var channelParts = channel.split('/'); var last = channelParts.length - 1; for (var i = last; i > 0; --i) { var channelPart = channelParts.slice(0, i).join('/') + '/*'; // We don't want to notify /foo/* if the channel is /foo/bar/baz, // so we stop at the first non recursive globbing if (i === last) { _notify(channelPart, message); } // Add the recursive globber and notify channelPart += '*'; _notify(channelPart, message); } } function _cancelDelayedSend() { if (_scheduledSend !== null) { org.cometd.Utils.clearTimeout(_scheduledSend); } _scheduledSend = null; } function _delayedSend(operation) { _cancelDelayedSend(); var delay = _advice.interval + _backoff; _cometd._debug('Function scheduled in', delay, 'ms, interval =', _advice.interval, 'backoff =', _backoff, operation); _scheduledSend = org.cometd.Utils.setTimeout(_cometd, operation, delay); } // Needed to break cyclic dependencies between function definitions var _handleMessages; var _handleFailure; /** * Delivers the messages to the CometD server * @param messages the array of messages to send * @param longpoll true if this send is a long poll */ function _send(sync, messages, longpoll, extraPath) { // We must be sure that the messages have a clientId. // This is not guaranteed since the handshake may take time to return // (and hence the clientId is not known yet) and the application // may create other messages. for (var i = 0; i < messages.length; ++i) { var message = messages[i]; message.id = '' + _nextMessageId(); if (_clientId) { message.clientId = _clientId; } var callback = undefined; if (_isFunction(message._callback)) { callback = message._callback; // Remove the publish callback before calling the extensions delete message._callback; } message = _applyOutgoingExtensions(message); if (message !== undefined && message !== null) { messages[i] = message; if (callback) _publishCallbacks[message.id] = callback; } else { messages.splice(i--, 1); } } if (messages.length === 0) { return; } var url = _config.url; if (_config.appendMessageTypeToURL) { // If url does not end with '/', then append it if (!url.match(/\/$/)) { url = url + '/'; } if (extraPath) { url = url + extraPath; } } var envelope = { url: url, sync: sync, messages: messages, onSuccess: function(rcvdMessages) { try { _handleMessages.call(_cometd, rcvdMessages); } catch (x) { _cometd._debug('Exception during handling of messages', x); } }, onFailure: function(conduit, messages, reason, exception) { try { _handleFailure.call(_cometd, conduit, messages, reason, exception); } catch (x) { _cometd._debug('Exception during handling of failure', x); } } }; _cometd._debug('Send', envelope); _transport.send(envelope, longpoll); } function _queueSend(message) { if (_batch > 0 || _internalBatch === true) { _messageQueue.push(message); } else { _send(false, [message], false); } } /** * Sends a complete bayeux message. * This method is exposed as a public so that extensions may use it * to send bayeux message directly, for example in case of re-sending * messages that have already been sent but that for some reason must * be resent. */ this.send = _queueSend; function _resetBackoff() { _backoff = 0; } function _increaseBackoff() { if (_backoff < _config.maxBackoff) { _backoff += _config.backoffIncrement; } } /** * Starts a the batch of messages to be sent in a single request. * @see #_endBatch(sendMessages) */ function _startBatch() { ++_batch; } function _flushBatch() { var messages = _messageQueue; _messageQueue = []; if (messages.length > 0) { _send(false, messages, false); } } /** * Ends the batch of messages to be sent in a single request, * optionally sending messages present in the message queue depending * on the given argument. * @see #_startBatch() */ function _endBatch() { --_batch; if (_batch < 0) { throw 'Calls to startBatch() and endBatch() are not paired'; } if (_batch === 0 && !_isDisconnected() && !_internalBatch) { _flushBatch(); } } /** * Sends the connect message */ function _connect() { if (!_isDisconnected()) { var message = { channel: '/meta/connect', connectionType: _transport.getType() }; // In case of reload or temporary loss of connection // we want the next successful connect to return immediately // instead of being held by the server, so that connect listeners // can be notified that the connection has been re-established if (!_connected) { message.advice = { timeout: 0 }; } _setStatus('connecting'); _cometd._debug('Connect sent', message); _send(false, [message], true, 'connect'); _setStatus('connected'); } } function _delayedConnect() { _setStatus('connecting'); _delayedSend(function() { _connect(); }); } function _updateAdvice(newAdvice) { if (newAdvice) { _advice = _cometd._mixin(false, {}, _config.advice, newAdvice); _cometd._debug('New advice', _advice); } } function _disconnect(abort) { _cancelDelayedSend(); if (abort) { _transport.abort(); } _clientId = null; _setStatus('disconnected'); _batch = 0; _resetBackoff(); // Fail any existing queued message if (_messageQueue.length > 0) { _handleFailure.call(_cometd, undefined, _messageQueue, 'error', 'Disconnected'); _messageQueue = []; } } /** * Sends the initial handshake message */ function _handshake(handshakeProps) { _clientId = null; _clearSubscriptions(); // Reset the transports if we're not retrying the handshake if (_isDisconnected()) { _transports.reset(); _updateAdvice(_config.advice); } else { // We are retrying the handshake, either because another handshake failed // and we're backing off, or because the server timed us out and asks us to // re-handshake: in both cases, make sure that if the handshake succeeds // the next action is a connect. _updateAdvice(_cometd._mixin(false, _advice, {reconnect: 'retry'})); } _batch = 0; // Mark the start of an internal batch. // This is needed because handshake and connect are async. // It may happen that the application calls init() then subscribe() // and the subscribe message is sent before the connect message, if // the subscribe message is not held until the connect message is sent. // So here we start a batch to hold temporarily any message until // the connection is fully established. _internalBatch = true; // Save the properties provided by the user, so that // we can reuse them during automatic re-handshake _handshakeProps = handshakeProps; var version = '1.0'; // Figure out the transports to send to the server var transportTypes = _transports.findTransportTypes(version, _crossDomain, _config.url); var bayeuxMessage = { version: version, minimumVersion: '0.9', channel: '/meta/handshake', supportedConnectionTypes: transportTypes, advice: { timeout: _advice.timeout, interval: _advice.interval } }; // Do not allow the user to mess with the required properties, // so merge first the user properties and *then* the bayeux message var message = _cometd._mixin(false, {}, _handshakeProps, bayeuxMessage); // Pick up the first available transport as initial transport // since we don't know if the server supports it _transport = _transports.negotiateTransport(transportTypes, version, _crossDomain, _config.url); _cometd._debug('Initial transport is', _transport.getType()); // We started a batch to hold the application messages, // so here we must bypass it and send immediately. _setStatus('handshaking'); _cometd._debug('Handshake sent', message); _send(false, [message], false, 'handshake'); } function _delayedHandshake() { _setStatus('handshaking'); // We will call _handshake() which will reset _clientId, but we want to avoid // that between the end of this method and the call to _handshake() someone may // call publish() (or other methods that call _queueSend()). _internalBatch = true; _delayedSend(function() { _handshake(_handshakeProps); }); } function _failHandshake(message) { _notifyListeners('/meta/handshake', message); _notifyListeners('/meta/unsuccessful', message); // Only try again if we haven't been disconnected and // the advice permits us to retry the handshake var retry = !_isDisconnected() && _advice.reconnect !== 'none'; if (retry) { _increaseBackoff(); _delayedHandshake(); } else { _disconnect(false); } } function _handshakeResponse(message) { if (message.successful) { // Save clientId, figure out transport, then follow the advice to connect _clientId = message.clientId; var newTransport = _transports.negotiateTransport(message.supportedConnectionTypes, message.version, _crossDomain, _config.url); if (newTransport === null) { throw 'Could not negotiate transport with server; client ' + _transports.findTransportTypes(message.version, _crossDomain, _config.url) + ', server ' + message.supportedConnectionTypes; } else if (_transport !== newTransport) { _cometd._debug('Transport', _transport, '->', newTransport); _transport = newTransport; } // End the internal batch and allow held messages from the application // to go to the server (see _handshake() where we start the internal batch). _internalBatch = false; _flushBatch(); // Here the new transport is in place, as well as the clientId, so // the listeners can perform a publish() if they want. // Notify the listeners before the connect below. message.reestablish = _reestablish; _reestablish = true; _notifyListeners('/meta/handshake', message); var action = _isDisconnected() ? 'none' : _advice.reconnect; switch (action) { case 'retry': _resetBackoff(); _delayedConnect(); break; case 'none': _disconnect(false); break; default: throw 'Unrecognized advice action ' + action; } } else { _failHandshake(message); } } function _handshakeFailure(xhr, message) { _failHandshake({ successful: false, failure: true, channel: '/meta/handshake', request: message, xhr: xhr, advice: { reconnect: 'retry', interval: _backoff } }); } function _failConnect(message) { // Notify the listeners after the status change but before the next action _notifyListeners('/meta/connect', message); _notifyListeners('/meta/unsuccessful', message); // This may happen when the server crashed, the current clientId // will be invalid, and the server will ask to handshake again // Listeners can call disconnect(), so check the state after they run var action = _isDisconnected() ? 'none' : _advice.reconnect; switch (action) { case 'retry': _delayedConnect(); _increaseBackoff(); break; case 'handshake': // The current transport may be failed (e.g. network disconnection) // Reset the transports so the new handshake picks up the right one _transports.reset(); _resetBackoff(); _delayedHandshake(); break; case 'none': _disconnect(false); break; default: throw 'Unrecognized advice action' + action; } } function _connectResponse(message) { _connected = message.successful; if (_connected) { _notifyListeners('/meta/connect', message); // Normally, the advice will say "reconnect: 'retry', interval: 0" // and the server will hold the request, so when a response returns // we immediately call the server again (long polling) // Listeners can call disconnect(), so check the state after they run var action = _isDisconnected() ? 'none' : _advice.reconnect; switch (action) { case 'retry': _resetBackoff(); _delayedConnect(); break; case 'none': _disconnect(false); break; default: throw 'Unrecognized advice action ' + action; } } else { _failConnect(message); } } function _connectFailure(xhr, message) { _connected = false; _failConnect({ successful: false, failure: true, channel: '/meta/connect', request: message, xhr: xhr, advice: { reconnect: 'retry', interval: _backoff } }); } function _failDisconnect(message) { _disconnect(true); _notifyListeners('/meta/disconnect', message); _notifyListeners('/meta/unsuccessful', message); } function _disconnectResponse(message) { if (message.successful) { _disconnect(false); _notifyListeners('/meta/disconnect', message); } else { _failDisconnect(message); } } function _disconnectFailure(xhr, message) { _failDisconnect({ successful: false, failure: true, channel: '/meta/disconnect', request: message, xhr: xhr, advice: { reconnect: 'none', interval: 0 } }); } function _failSubscribe(message) { _notifyListeners('/meta/subscribe', message); _notifyListeners('/meta/unsuccessful', message); } function _subscribeResponse(message) { if (message.successful) { _notifyListeners('/meta/subscribe', message); } else { _failSubscribe(message); } } function _subscribeFailure(xhr, message) { _failSubscribe({ successful: false, failure: true, channel: '/meta/subscribe', request: message, xhr: xhr, advice: { reconnect: 'none', interval: 0 } }); } function _failUnsubscribe(message) { _notifyListeners('/meta/unsubscribe', message); _notifyListeners('/meta/unsuccessful', message); } function _unsubscribeResponse(message) { if (message.successful) { _notifyListeners('/meta/unsubscribe', message); } else { _failUnsubscribe(message); } } function _unsubscribeFailure(xhr, message) { _failUnsubscribe({ successful: false, failure: true, channel: '/meta/unsubscribe', request: message, xhr: xhr, advice: { reconnect: 'none', interval: 0 } }); } function _handlePublishCallback(message) { var callback = _publishCallbacks[message.id]; if (_isFunction(callback)) { delete _publishCallbacks[message.id]; callback.call(_cometd, message); } } function _failMessage(message) { _handlePublishCallback(message); _notifyListeners('/meta/publish', message); _notifyListeners('/meta/unsuccessful', message); } function _messageResponse(message) { if (message.successful === undefined) { if (message.data) { // It is a plain message, and not a bayeux meta message _notifyListeners(message.channel, message); } else { _cometd._debug('Unknown message', message); } } else { if (message.successful) { _handlePublishCallback(message); _notifyListeners('/meta/publish', message); } else { _failMessage(message); } } } function _messageFailure(xhr, message) { _failMessage({ successful: false, failure: true, channel: message.channel, request: message, xhr: xhr, advice: { reconnect: 'none', interval: 0 } }); } function _receive(message) { message = _applyIncomingExtensions(message); if (message === undefined || message === null) { return; } _updateAdvice(message.advice); var channel = message.channel; switch (channel) { case '/meta/handshake': _handshakeResponse(message); break; case '/meta/connect': _connectResponse(message); break; case '/meta/disconnect': _disconnectResponse(message); break; case '/meta/subscribe': _subscribeResponse(message); break; case '/meta/unsubscribe': _unsubscribeResponse(message); break; default: _messageResponse(message); break; } } /** * Receives a message. * This method is exposed as a public so that extensions may inject * messages simulating that they had been received. */ this.receive = _receive; _handleMessages = function(rcvdMessages) { _cometd._debug('Received', rcvdMessages); for (var i = 0; i < rcvdMessages.length; ++i) { var message = rcvdMessages[i]; _receive(message); } }; _handleFailure = function(conduit, messages, reason, exception) { _cometd._debug('handleFailure', conduit, messages, reason, exception); for (var i = 0; i < messages.length; ++i) { var message = messages[i]; var channel = message.channel; switch (channel) { case '/meta/handshake': _handshakeFailure(conduit, message); break; case '/meta/connect': _connectFailure(conduit, message); break; case '/meta/disconnect': _disconnectFailure(conduit, message); break; case '/meta/subscribe': _subscribeFailure(conduit, message); break; case '/meta/unsubscribe': _unsubscribeFailure(conduit, message); break; default: _messageFailure(conduit, message); break; } } }; function _hasSubscriptions(channel) { var subscriptions = _listeners[channel]; if (subscriptions) { for (var i = 0; i < subscriptions.length; ++i) { if (subscriptions[i]) { return true; } } } return false; } function _resolveScopedCallback(scope, callback) { var delegate = { scope: scope, method: callback }; if (_isFunction(scope)) { delegate.scope = undefined; delegate.method = scope; } else { if (_isString(callback)) { if (!scope) { throw 'Invalid scope ' + scope; } delegate.method = scope[callback]; if (!_isFunction(delegate.method)) { throw 'Invalid callback ' + callback + ' for scope ' + scope; } } else if (!_isFunction(callback)) { throw 'Invalid callback ' + callback; } } return delegate; } function _addListener(channel, scope, callback, isListener) { // The data structure is a map
url
* of type string containing the URL of the Bayeux server.
* @param configuration the configuration object
*/
this.configure = function(configuration)
{
_configure.call(this, configuration);
};
/**
* Configures and establishes the Bayeux communication with the Bayeux server
* via a handshake and a subsequent connect.
* @param configuration the configuration object
* @param handshakeProps an object to be merged with the handshake message
* @see #configure(configuration)
* @see #handshake(handshakeProps)
*/
this.init = function(configuration, handshakeProps)
{
this.configure(configuration);
this.handshake(handshakeProps);
};
/**
* Establishes the Bayeux communication with the Bayeux server
* via a handshake and a subsequent connect.
* @param handshakeProps an object to be merged with the handshake message
*/
this.handshake = function(handshakeProps)
{
_setStatus('disconnected');
_reestablish = false;
_handshake(handshakeProps);
};
/**
* Disconnects from the Bayeux server.
* It is possible to suggest to attempt a synchronous disconnect, but this feature
* may only be available in certain transports (for example, long-polling may support
* it, callback-polling certainly does not).
* @param sync whether attempt to perform a synchronous disconnect
* @param disconnectProps an object to be merged with the disconnect message
*/
this.disconnect = function(sync, disconnectProps)
{
if (_isDisconnected())
{
return;
}
if (disconnectProps === undefined)
{
if (typeof sync !== 'boolean')
{
disconnectProps = sync;
sync = false;
}
}
var bayeuxMessage = {
channel: '/meta/disconnect'
};
var message = this._mixin(false, {}, disconnectProps, bayeuxMessage);
_setStatus('disconnecting');
_send(sync === true, [message], false, 'disconnect');
};
/**
* Marks the start of a batch of application messages to be sent to the server
* in a single request, obtaining a single response containing (possibly) many
* application reply messages.
* Messages are held in a queue and not sent until {@link #endBatch()} is called.
* If startBatch() is called multiple times, then an equal number of endBatch()
* calls must be made to close and send the batch of messages.
* @see #endBatch()
*/
this.startBatch = function()
{
_startBatch();
};
/**
* Marks the end of a batch of application messages to be sent to the server
* in a single request.
* @see #startBatch()
*/
this.endBatch = function()
{
_endBatch();
};
/**
* Executes the given callback in the given scope, surrounded by a {@link #startBatch()}
* and {@link #endBatch()} calls.
* @param scope the scope of the callback, may be omitted
* @param callback the callback to be executed within {@link #startBatch()} and {@link #endBatch()} calls
*/
this.batch = function(scope, callback)
{
var delegate = _resolveScopedCallback(scope, callback);
this.startBatch();
try
{
delegate.method.call(delegate.scope);
this.endBatch();
}
catch (x)
{
this._debug('Exception during execution of batch', x);
this.endBatch();
throw x;
}
};
/**
* Adds a listener for bayeux messages, performing the given callback in the given scope
* when a message for the given channel arrives.
* @param channel the channel the listener is interested to
* @param scope the scope of the callback, may be omitted
* @param callback the callback to call when a message is sent to the channel
* @returns the subscription handle to be passed to {@link #removeListener(object)}
* @see #removeListener(subscription)
*/
this.addListener = function(channel, scope, callback)
{
if (arguments.length < 2)
{
throw 'Illegal arguments number: required 2, got ' + arguments.length;
}
if (!_isString(channel))
{
throw 'Illegal argument type: channel must be a string';
}
return _addListener(channel, scope, callback, true);
};
/**
* Removes the subscription obtained with a call to {@link #addListener(string, object, function)}.
* @param subscription the subscription to unsubscribe.
* @see #addListener(channel, scope, callback)
*/
this.removeListener = function(subscription)
{
if (!org.cometd.Utils.isArray(subscription))
{
throw 'Invalid argument: expected subscription, not ' + subscription;
}
_removeListener(subscription);
};
/**
* Removes all listeners registered with {@link #addListener(channel, scope, callback)} or
* {@link #subscribe(channel, scope, callback)}.
*/
this.clearListeners = function()
{
_listeners = {};
};
/**
* Subscribes to the given channel, performing the given callback in the given scope
* when a message for the channel arrives.
* @param channel the channel to subscribe to
* @param scope the scope of the callback, may be omitted
* @param callback the callback to call when a message is sent to the channel
* @param subscribeProps an object to be merged with the subscribe message
* @return the subscription handle to be passed to {@link #unsubscribe(object)}
*/
this.subscribe = function(channel, scope, callback, subscribeProps)
{
if (arguments.length < 2)
{
throw 'Illegal arguments number: required 2, got ' + arguments.length;
}
if (!_isString(channel))
{
throw 'Illegal argument type: channel must be a string';
}
if (_isDisconnected())
{
throw 'Illegal state: already disconnected';
}
// Normalize arguments
if (_isFunction(scope))
{
subscribeProps = callback;
callback = scope;
scope = undefined;
}
// Only send the message to the server if this client has not yet subscribed to the channel
var send = !_hasSubscriptions(channel);
var subscription = _addListener(channel, scope, callback, false);
if (send)
{
// Send the subscription message after the subscription registration to avoid
// races where the server would send a message to the subscribers, but here
// on the client the subscription has not been added yet to the data structures
var bayeuxMessage = {
channel: '/meta/subscribe',
subscription: channel
};
var message = this._mixin(false, {}, subscribeProps, bayeuxMessage);
_queueSend(message);
}
return subscription;
};
/**
* Unsubscribes the subscription obtained with a call to {@link #subscribe(string, object, function)}.
* @param subscription the subscription to unsubscribe.
*/
this.unsubscribe = function(subscription, unsubscribeProps)
{
if (arguments.length < 1)
{
throw 'Illegal arguments number: required 1, got ' + arguments.length;
}
if (_isDisconnected())
{
throw 'Illegal state: already disconnected';
}
// Remove the local listener before sending the message
// This ensures that if the server fails, this client does not get notifications
this.removeListener(subscription);
var channel = subscription[0];
// Only send the message to the server if this client unsubscribes the last subscription
if (!_hasSubscriptions(channel))
{
var bayeuxMessage = {
channel: '/meta/unsubscribe',
subscription: channel
};
var message = this._mixin(false, {}, unsubscribeProps, bayeuxMessage);
_queueSend(message);
}
};
/**
* Removes all subscriptions added via {@link #subscribe(channel, scope, callback, subscribeProps)},
* but does not remove the listeners added via {@link addListener(channel, scope, callback)}.
*/
this.clearSubscriptions = function()
{
_clearSubscriptions();
};
/**
* Publishes a message on the given channel, containing the given content.
* @param channel the channel to publish the message to
* @param content the content of the message
* @param publishProps an object to be merged with the publish message
*/
this.publish = function(channel, content, publishProps, publishCallback)
{
if (arguments.length < 1)
{
throw 'Illegal arguments number: required 1, got ' + arguments.length;
}
if (!_isString(channel))
{
throw 'Illegal argument type: channel must be a string';
}
if (_isDisconnected())
{
throw 'Illegal state: already disconnected';
}
if (_isFunction(content))
{
publishCallback = content;
content = publishProps = {};
}
else if (_isFunction(publishProps))
{
publishCallback = publishProps;
publishProps = {};
}
var bayeuxMessage = {
channel: channel,
data: content,
_callback: publishCallback
};
var message = this._mixin(false, {}, publishProps, bayeuxMessage);
_queueSend(message);
};
/**
* Returns a string representing the status of the bayeux communication with the Bayeux server.
*/
this.getStatus = function()
{
return _status;
};
/**
* Returns whether this instance has been disconnected.
*/
this.isDisconnected = _isDisconnected;
/**
* Sets the backoff period used to increase the backoff time when retrying an unsuccessful or failed message.
* Default value is 1 second, which means if there is a persistent failure the retries will happen
* after 1 second, then after 2 seconds, then after 3 seconds, etc. So for example with 15 seconds of
* elapsed time, there will be 5 retries (at 1, 3, 6, 10 and 15 seconds elapsed).
* @param period the backoff period to set
* @see #getBackoffIncrement()
*/
this.setBackoffIncrement = function(period)
{
_config.backoffIncrement = period;
};
/**
* Returns the backoff period used to increase the backoff time when retrying an unsuccessful or failed message.
* @see #setBackoffIncrement(period)
*/
this.getBackoffIncrement = function()
{
return _config.backoffIncrement;
};
/**
* Returns the backoff period to wait before retrying an unsuccessful or failed message.
*/
this.getBackoffPeriod = function()
{
return _backoff;
};
/**
* Sets the log level for console logging.
* Valid values are the strings 'error', 'warn', 'info' and 'debug', from
* less verbose to more verbose.
* @param level the log level string
*/
this.setLogLevel = function(level)
{
_config.logLevel = level;
};
/**
* Registers an extension whose callbacks are called for every incoming message
* (that comes from the server to this client implementation) and for every
* outgoing message (that originates from this client implementation for the
* server).
* The format of the extension object is the following:
* * { * incoming: function(message) { ... }, * outgoing: function(message) { ... } * } ** Both properties are optional, but if they are present they will be called * respectively for each incoming message and for each outgoing message. * @param name the name of the extension * @param extension the extension to register * @return true if the extension was registered, false otherwise * @see #unregisterExtension(name) */ this.registerExtension = function(name, extension) { if (arguments.length < 2) { throw 'Illegal arguments number: required 2, got ' + arguments.length; } if (!_isString(name)) { throw 'Illegal argument type: extension name must be a string'; } var existing = false; for (var i = 0; i < _extensions.length; ++i) { var existingExtension = _extensions[i]; if (existingExtension.name === name) { existing = true; break; } } if (!existing) { _extensions.push({ name: name, extension: extension }); this._debug('Registered extension', name); // Callback for extensions if (_isFunction(extension.registered)) { extension.registered(name, this); } return true; } else { this._info('Could not register extension with name', name, 'since another extension with the same name already exists'); return false; } }; /** * Unregister an extension previously registered with * {@link #registerExtension(name, extension)}. * @param name the name of the extension to unregister. * @return true if the extension was unregistered, false otherwise */ this.unregisterExtension = function(name) { if (!_isString(name)) { throw 'Illegal argument type: extension name must be a string'; } var unregistered = false; for (var i = 0; i < _extensions.length; ++i) { var extension = _extensions[i]; if (extension.name === name) { _extensions.splice(i, 1); unregistered = true; this._debug('Unregistered extension', name); // Callback for extensions var ext = extension.extension; if (_isFunction(ext.unregistered)) { ext.unregistered(); } break; } } return unregistered; }; /** * Find the extension registered with the given name. * @param name the name of the extension to find * @return the extension found or null if no extension with the given name has been registered */ this.getExtension = function(name) { for (var i = 0; i < _extensions.length; ++i) { var extension = _extensions[i]; if (extension.name === name) { return extension.extension; } } return null; }; /** * Returns the name assigned to this Cometd object, or the string 'default' * if no name has been explicitly passed as parameter to the constructor. */ this.getName = function() { return _name; }; /** * Returns the clientId assigned by the Bayeux server during handshake. */ this.getClientId = function() { return _clientId; }; /** * Returns the URL of the Bayeux server. */ this.getURL = function() { return _config.url; }; this.getTransport = function() { return _transport; }; this.getConfiguration = function() { return this._mixin(true, {}, _config); }; this.getAdvice = function() { return this._mixin(true, {}, _advice); }; // WebSocket handling for Firefox, which deploys WebSocket // under the name of MozWebSocket in Firefox 6, 7, 8 and 9 org.cometd.WebSocket = window.WebSocket; if (!org.cometd.WebSocket) { org.cometd.WebSocket = window.MozWebSocket; } }; if (typeof define === 'function' && define.amd) { define(function() { return org.cometd; }); }