aboutsummaryrefslogtreecommitdiffstats
path: root/dgbuilder/core_nodes/io/lib
diff options
context:
space:
mode:
Diffstat (limited to 'dgbuilder/core_nodes/io/lib')
-rw-r--r--dgbuilder/core_nodes/io/lib/mqtt.js254
-rw-r--r--dgbuilder/core_nodes/io/lib/mqttConnectionPool.js128
2 files changed, 0 insertions, 382 deletions
diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js
deleted file mode 100644
index 141a8889..00000000
--- a/dgbuilder/core_nodes/io/lib/mqtt.js
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Copyright 2013 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-var util = require("util");
-var mqtt = require("mqtt");
-var events = require("events");
-//var inspect = require("sys").inspect;
-
-//var Client = module.exports.Client = function(
-
-var port = 1883;
-var host = "localhost";
-
-function MQTTClient(port,host) {
- this.port = port||1883;
- this.host = host||"localhost";
- this.messageId = 1;
- this.pendingSubscriptions = {};
- this.inboundMessages = {};
- this.lastOutbound = (new Date()).getTime();
- this.lastInbound = (new Date()).getTime();
- this.connected = false;
-
- this._nextMessageId = function() {
- this.messageId += 1;
- if (this.messageId > 0xFFFF) {
- this.messageId = 1;
- }
- return this.messageId;
- }
- events.EventEmitter.call(this);
-}
-util.inherits(MQTTClient, events.EventEmitter);
-
-MQTTClient.prototype.connect = function(options) {
- if (!this.connected) {
- var self = this;
- options = options||{};
- self.options = options;
- self.options.keepalive = options.keepalive||15;
- self.options.clean = self.options.clean||true;
- self.options.protocolId = 'MQIsdp';
- self.options.protocolVersion = 3;
-
- self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
- if (err) {
- self.connected = false;
- clearInterval(self.watchdog);
- self.connectionError = true;
- //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
- self.emit('connectionlost',err);
- return;
- }
- client.on('close',function(e) {
- //util.log('[mqtt] ['+self.uid+'] on close');
- clearInterval(self.watchdog);
- if (!self.connectionError) {
- if (self.connected) {
- self.connected = false;
- self.emit('connectionlost',e);
- } else {
- self.emit('disconnect');
- }
- }
- });
- client.on('error',function(e) {
- //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
- clearInterval(self.watchdog);
- if (self.connected) {
- self.connected = false;
- self.emit('connectionlost',e);
- }
- });
- client.on('connack',function(packet) {
- if (packet.returnCode == 0) {
- self.watchdog = setInterval(function(self) {
- var now = (new Date()).getTime();
-
- //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
-
- if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
- if (self.pingOutstanding) {
- //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
- try {
- self.client.disconnect();
- } catch (err) {
- }
- } else {
- //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
- self.lastOutbound = (new Date()).getTime();
- self.lastInbound = (new Date()).getTime();
- self.pingOutstanding = true;
- self.client.pingreq();
- }
- }
-
- },self.options.keepalive*500,self);
- self.pingOutstanding = false;
- self.lastInbound = (new Date()).getTime()
- self.lastOutbound = (new Date()).getTime()
- self.connected = true;
- self.connectionError = false;
- self.emit('connect');
- } else {
- self.connected = false;
- self.emit('connectionlost');
- }
- });
- client.on('suback',function(packet) {
- self.lastInbound = (new Date()).getTime()
- var topic = self.pendingSubscriptions[packet.messageId];
- self.emit('subscribe',topic,packet.granted[0]);
- delete self.pendingSubscriptions[packet.messageId];
- });
- client.on('unsuback',function(packet) {
- self.lastInbound = (new Date()).getTime()
- var topic = self.pendingSubscriptions[packet.messageId];
- self.emit('unsubscribe',topic,packet.granted[0]);
- delete self.pendingSubscriptions[packet.messageId];
- });
- client.on('publish',function(packet) {
- self.lastInbound = (new Date()).getTime()
- if (packet.qos < 2) {
- var p = packet;
- self.emit('message',p.topic,p.payload,p.qos,p.retain);
- } else {
- self.inboundMessages[packet.messageId] = packet;
- this.lastOutbound = (new Date()).getTime()
- self.client.pubrec(packet);
- }
- if (packet.qos == 1) {
- this.lastOutbound = (new Date()).getTime()
- self.client.puback(packet);
- }
- });
-
- client.on('pubrel',function(packet) {
- self.lastInbound = (new Date()).getTime()
- var p = self.inboundMessages[packet.messageId];
- if (p) {
- self.emit('message',p.topic,p.payload,p.qos,p.retain);
- delete self.inboundMessages[packet.messageId];
- }
- self.lastOutbound = (new Date()).getTime()
- self.client.pubcomp(packet);
- });
-
- client.on('puback',function(packet) {
- self.lastInbound = (new Date()).getTime()
- // outbound qos-1 complete
- });
-
- client.on('pubrec',function(packet) {
- self.lastInbound = (new Date()).getTime()
- self.lastOutbound = (new Date()).getTime()
- self.client.pubrel(packet);
- });
- client.on('pubcomp',function(packet) {
- self.lastInbound = (new Date()).getTime()
- // outbound qos-2 complete
- });
- client.on('pingresp',function(packet) {
- //util.log('[mqtt] ['+self.uid+'] received pingresp');
- self.lastInbound = (new Date()).getTime()
- self.pingOutstanding = false;
- });
-
- this.lastOutbound = (new Date()).getTime()
- this.connectionError = false;
- client.connect(self.options);
- });
- }
-}
-
-MQTTClient.prototype.subscribe = function(topic,qos) {
- var self = this;
- if (self.connected) {
- var options = {
- subscriptions:[{topic:topic,qos:qos}],
- messageId: self._nextMessageId()
- };
- this.pendingSubscriptions[options.messageId] = topic;
- this.lastOutbound = (new Date()).getTime()
- self.client.subscribe(options);
- }
-}
-MQTTClient.prototype.unsubscribe = function(topic) {
- var self = this;
- if (self.connected) {
- var options = {
- topic:topic,
- messageId: self._nextMessageId()
- };
- this.pendingSubscriptions[options.messageId] = topic;
- this.lastOutbound = (new Date()).getTime()
- self.client.unsubscribe(options);
- }
-}
-
-MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
- var self = this;
- if (self.connected) {
-
- if (!Buffer.isBuffer(payload)) {
- if (typeof payload === "object") {
- payload = JSON.stringify(payload);
- } else if (typeof payload !== "string") {
- payload = ""+payload;
- }
- }
- var options = {
- topic: topic,
- payload: payload,
- qos: qos||0,
- retain:retain||false
- };
- if (options.qos != 0) {
- options.messageId = self._nextMessageId();
- }
- this.lastOutbound = (new Date()).getTime()
- self.client.publish(options);
- }
-}
-
-MQTTClient.prototype.disconnect = function() {
- var self = this;
- if (this.connected) {
- this.connected = false;
- try {
- this.client.disconnect();
- } catch(err) {
- }
- }
-}
-MQTTClient.prototype.isConnected = function() {
- return this.connected;
-}
-module.exports.createClient = function(port,host) {
- var mqtt_client = new MQTTClient(port,host);
- return mqtt_client;
-}
-
diff --git a/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
deleted file mode 100644
index d15f0fc7..00000000
--- a/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Copyright 2013 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-var util = require("util");
-var mqtt = require("./mqtt");
-var settings = require(process.env.NODE_RED_HOME+"/red/red").settings;
-
-var connections = {};
-
-function matchTopic(ts,t) {
- if (ts == "#") {
- return true;
- }
- var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
- return re.test(t);
-}
-
-module.exports = {
- get: function(broker,port,clientid,username,password,will) {
- var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port;
- if (!connections[id]) {
- connections[id] = function() {
- var uid = (1+Math.random()*4294967295).toString(16);
- var client = mqtt.createClient(port,broker);
- client.uid = uid;
- client.setMaxListeners(0);
- var options = {keepalive:15};
- options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
- options.username = username;
- options.password = password;
- options.will = will;
- var queue = [];
- var subscriptions = [];
- var connecting = false;
- var obj = {
- _instances: 0,
- publish: function(msg) {
- if (client.isConnected()) {
- client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
- } else {
- if (!connecting) {
- connecting = true;
- client.connect(options);
- }
- queue.push(msg);
- }
- },
- subscribe: function(topic,qos,callback) {
- subscriptions.push({topic:topic,qos:qos,callback:callback});
- client.on('message',function(mtopic,mpayload,mqos,mretain) {
- if (matchTopic(topic,mtopic)) {
- callback(mtopic,mpayload,mqos,mretain);
- }
- });
- if (client.isConnected()) {
- client.subscribe(topic,qos);
- }
- },
- on: function(a,b){
- client.on(a,b);
- },
- once: function(a,b){
- client.once(a,b);
- },
- connect: function() {
- if (client && !client.isConnected() && !connecting) {
- connecting = true;
- client.connect(options);
- }
- },
- disconnect: function() {
- this._instances -= 1;
- if (this._instances == 0) {
- client.disconnect();
- client = null;
- delete connections[id];
- }
- }
- };
- client.on('connect',function() {
- if (client) {
- util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
- connecting = false;
- for (var s in subscriptions) {
- var topic = subscriptions[s].topic;
- var qos = subscriptions[s].qos;
- var callback = subscriptions[s].callback;
- client.subscribe(topic,qos);
- }
- //console.log("connected - publishing",queue.length,"messages");
- while(queue.length) {
- var msg = queue.shift();
- //console.log(msg);
- client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
- }
- }
- });
- client.on('connectionlost', function(err) {
- util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
- connecting = false;
- setTimeout(function() {
- obj.connect();
- }, settings.mqttReconnectTime||5000);
- });
- client.on('disconnect', function() {
- connecting = false;
- util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
- });
-
- return obj
- }();
- }
- connections[id]._instances += 1;
- return connections[id];
- }
-};