aboutsummaryrefslogtreecommitdiffstats
path: root/dgbuilder/core_nodes/io/lib
diff options
context:
space:
mode:
authorTimoney, Daniel (dt5972) <dtimoney@att.com>2017-02-15 10:37:53 -0500
committerTimoney, Daniel (dt5972) <dtimoney@att.com>2017-02-15 10:40:37 -0500
commit324ee36fe31763e507b422ab0a88e4230045e205 (patch)
treed0b04520f6657601c918ce63fd27575977624187 /dgbuilder/core_nodes/io/lib
parentf0c97e8db427481e28c0a16b789bc73801b35e47 (diff)
Initial commit for OpenECOMP SDN-C OA&M
Change-Id: I7ab579fd0d206bf356f36d52dcdf4f71f1fa2680 Signed-off-by: Timoney, Daniel (dt5972) <dtimoney@att.com> Former-commit-id: 2a9f0edd09581f907e62ec4689b5ac94dd5382ba
Diffstat (limited to 'dgbuilder/core_nodes/io/lib')
-rw-r--r--dgbuilder/core_nodes/io/lib/mqtt.js254
-rw-r--r--dgbuilder/core_nodes/io/lib/mqttConnectionPool.js128
2 files changed, 382 insertions, 0 deletions
diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js
new file mode 100644
index 00000000..141a8889
--- /dev/null
+++ b/dgbuilder/core_nodes/io/lib/mqtt.js
@@ -0,0 +1,254 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+var util = require("util");
+var mqtt = require("mqtt");
+var events = require("events");
+//var inspect = require("sys").inspect;
+
+//var Client = module.exports.Client = function(
+
+var port = 1883;
+var host = "localhost";
+
+function MQTTClient(port,host) {
+ this.port = port||1883;
+ this.host = host||"localhost";
+ this.messageId = 1;
+ this.pendingSubscriptions = {};
+ this.inboundMessages = {};
+ this.lastOutbound = (new Date()).getTime();
+ this.lastInbound = (new Date()).getTime();
+ this.connected = false;
+
+ this._nextMessageId = function() {
+ this.messageId += 1;
+ if (this.messageId > 0xFFFF) {
+ this.messageId = 1;
+ }
+ return this.messageId;
+ }
+ events.EventEmitter.call(this);
+}
+util.inherits(MQTTClient, events.EventEmitter);
+
+MQTTClient.prototype.connect = function(options) {
+ if (!this.connected) {
+ var self = this;
+ options = options||{};
+ self.options = options;
+ self.options.keepalive = options.keepalive||15;
+ self.options.clean = self.options.clean||true;
+ self.options.protocolId = 'MQIsdp';
+ self.options.protocolVersion = 3;
+
+ self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
+ if (err) {
+ self.connected = false;
+ clearInterval(self.watchdog);
+ self.connectionError = true;
+ //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
+ self.emit('connectionlost',err);
+ return;
+ }
+ client.on('close',function(e) {
+ //util.log('[mqtt] ['+self.uid+'] on close');
+ clearInterval(self.watchdog);
+ if (!self.connectionError) {
+ if (self.connected) {
+ self.connected = false;
+ self.emit('connectionlost',e);
+ } else {
+ self.emit('disconnect');
+ }
+ }
+ });
+ client.on('error',function(e) {
+ //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
+ clearInterval(self.watchdog);
+ if (self.connected) {
+ self.connected = false;
+ self.emit('connectionlost',e);
+ }
+ });
+ client.on('connack',function(packet) {
+ if (packet.returnCode == 0) {
+ self.watchdog = setInterval(function(self) {
+ var now = (new Date()).getTime();
+
+ //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
+
+ if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
+ if (self.pingOutstanding) {
+ //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
+ try {
+ self.client.disconnect();
+ } catch (err) {
+ }
+ } else {
+ //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
+ self.lastOutbound = (new Date()).getTime();
+ self.lastInbound = (new Date()).getTime();
+ self.pingOutstanding = true;
+ self.client.pingreq();
+ }
+ }
+
+ },self.options.keepalive*500,self);
+ self.pingOutstanding = false;
+ self.lastInbound = (new Date()).getTime()
+ self.lastOutbound = (new Date()).getTime()
+ self.connected = true;
+ self.connectionError = false;
+ self.emit('connect');
+ } else {
+ self.connected = false;
+ self.emit('connectionlost');
+ }
+ });
+ client.on('suback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var topic = self.pendingSubscriptions[packet.messageId];
+ self.emit('subscribe',topic,packet.granted[0]);
+ delete self.pendingSubscriptions[packet.messageId];
+ });
+ client.on('unsuback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var topic = self.pendingSubscriptions[packet.messageId];
+ self.emit('unsubscribe',topic,packet.granted[0]);
+ delete self.pendingSubscriptions[packet.messageId];
+ });
+ client.on('publish',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ if (packet.qos < 2) {
+ var p = packet;
+ self.emit('message',p.topic,p.payload,p.qos,p.retain);
+ } else {
+ self.inboundMessages[packet.messageId] = packet;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.pubrec(packet);
+ }
+ if (packet.qos == 1) {
+ this.lastOutbound = (new Date()).getTime()
+ self.client.puback(packet);
+ }
+ });
+
+ client.on('pubrel',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var p = self.inboundMessages[packet.messageId];
+ if (p) {
+ self.emit('message',p.topic,p.payload,p.qos,p.retain);
+ delete self.inboundMessages[packet.messageId];
+ }
+ self.lastOutbound = (new Date()).getTime()
+ self.client.pubcomp(packet);
+ });
+
+ client.on('puback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ // outbound qos-1 complete
+ });
+
+ client.on('pubrec',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ self.lastOutbound = (new Date()).getTime()
+ self.client.pubrel(packet);
+ });
+ client.on('pubcomp',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ // outbound qos-2 complete
+ });
+ client.on('pingresp',function(packet) {
+ //util.log('[mqtt] ['+self.uid+'] received pingresp');
+ self.lastInbound = (new Date()).getTime()
+ self.pingOutstanding = false;
+ });
+
+ this.lastOutbound = (new Date()).getTime()
+ this.connectionError = false;
+ client.connect(self.options);
+ });
+ }
+}
+
+MQTTClient.prototype.subscribe = function(topic,qos) {
+ var self = this;
+ if (self.connected) {
+ var options = {
+ subscriptions:[{topic:topic,qos:qos}],
+ messageId: self._nextMessageId()
+ };
+ this.pendingSubscriptions[options.messageId] = topic;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.subscribe(options);
+ }
+}
+MQTTClient.prototype.unsubscribe = function(topic) {
+ var self = this;
+ if (self.connected) {
+ var options = {
+ topic:topic,
+ messageId: self._nextMessageId()
+ };
+ this.pendingSubscriptions[options.messageId] = topic;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.unsubscribe(options);
+ }
+}
+
+MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
+ var self = this;
+ if (self.connected) {
+
+ if (!Buffer.isBuffer(payload)) {
+ if (typeof payload === "object") {
+ payload = JSON.stringify(payload);
+ } else if (typeof payload !== "string") {
+ payload = ""+payload;
+ }
+ }
+ var options = {
+ topic: topic,
+ payload: payload,
+ qos: qos||0,
+ retain:retain||false
+ };
+ if (options.qos != 0) {
+ options.messageId = self._nextMessageId();
+ }
+ this.lastOutbound = (new Date()).getTime()
+ self.client.publish(options);
+ }
+}
+
+MQTTClient.prototype.disconnect = function() {
+ var self = this;
+ if (this.connected) {
+ this.connected = false;
+ try {
+ this.client.disconnect();
+ } catch(err) {
+ }
+ }
+}
+MQTTClient.prototype.isConnected = function() {
+ return this.connected;
+}
+module.exports.createClient = function(port,host) {
+ var mqtt_client = new MQTTClient(port,host);
+ return mqtt_client;
+}
+
diff --git a/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
new file mode 100644
index 00000000..d15f0fc7
--- /dev/null
+++ b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+var util = require("util");
+var mqtt = require("./mqtt");
+var settings = require(process.env.NODE_RED_HOME+"/red/red").settings;
+
+var connections = {};
+
+function matchTopic(ts,t) {
+ if (ts == "#") {
+ return true;
+ }
+ var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
+ return re.test(t);
+}
+
+module.exports = {
+ get: function(broker,port,clientid,username,password,will) {
+ var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port;
+ if (!connections[id]) {
+ connections[id] = function() {
+ var uid = (1+Math.random()*4294967295).toString(16);
+ var client = mqtt.createClient(port,broker);
+ client.uid = uid;
+ client.setMaxListeners(0);
+ var options = {keepalive:15};
+ options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
+ options.username = username;
+ options.password = password;
+ options.will = will;
+ var queue = [];
+ var subscriptions = [];
+ var connecting = false;
+ var obj = {
+ _instances: 0,
+ publish: function(msg) {
+ if (client.isConnected()) {
+ client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
+ } else {
+ if (!connecting) {
+ connecting = true;
+ client.connect(options);
+ }
+ queue.push(msg);
+ }
+ },
+ subscribe: function(topic,qos,callback) {
+ subscriptions.push({topic:topic,qos:qos,callback:callback});
+ client.on('message',function(mtopic,mpayload,mqos,mretain) {
+ if (matchTopic(topic,mtopic)) {
+ callback(mtopic,mpayload,mqos,mretain);
+ }
+ });
+ if (client.isConnected()) {
+ client.subscribe(topic,qos);
+ }
+ },
+ on: function(a,b){
+ client.on(a,b);
+ },
+ once: function(a,b){
+ client.once(a,b);
+ },
+ connect: function() {
+ if (client && !client.isConnected() && !connecting) {
+ connecting = true;
+ client.connect(options);
+ }
+ },
+ disconnect: function() {
+ this._instances -= 1;
+ if (this._instances == 0) {
+ client.disconnect();
+ client = null;
+ delete connections[id];
+ }
+ }
+ };
+ client.on('connect',function() {
+ if (client) {
+ util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
+ connecting = false;
+ for (var s in subscriptions) {
+ var topic = subscriptions[s].topic;
+ var qos = subscriptions[s].qos;
+ var callback = subscriptions[s].callback;
+ client.subscribe(topic,qos);
+ }
+ //console.log("connected - publishing",queue.length,"messages");
+ while(queue.length) {
+ var msg = queue.shift();
+ //console.log(msg);
+ client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
+ }
+ }
+ });
+ client.on('connectionlost', function(err) {
+ util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
+ connecting = false;
+ setTimeout(function() {
+ obj.connect();
+ }, settings.mqttReconnectTime||5000);
+ });
+ client.on('disconnect', function() {
+ connecting = false;
+ util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
+ });
+
+ return obj
+ }();
+ }
+ connections[id]._instances += 1;
+ return connections[id];
+ }
+};