diff options
Diffstat (limited to 'dgbuilder/core_nodes/io')
-rw-r--r-- | dgbuilder/core_nodes/io/10-mqtt.html | 157 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/10-mqtt.js | 119 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/21-httpin.html | 254 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/21-httpin.js | 241 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/22-websocket.html | 163 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/22-websocket.js | 185 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/23-watch.html | 57 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/23-watch.js | 51 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/25-serial.html | 265 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/25-serial.js | 310 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/31-tcpin.html | 299 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/31-tcpin.js | 472 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/32-udp.html | 212 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/32-udp.js | 171 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/lib/mqtt.js | 254 | ||||
-rw-r--r-- | dgbuilder/core_nodes/io/lib/mqttConnectionPool.js | 128 |
16 files changed, 3338 insertions, 0 deletions
diff --git a/dgbuilder/core_nodes/io/10-mqtt.html b/dgbuilder/core_nodes/io/10-mqtt.html new file mode 100644 index 00000000..2ff5eb29 --- /dev/null +++ b/dgbuilder/core_nodes/io/10-mqtt.html @@ -0,0 +1,157 @@ +<!-- + Copyright 2013,2014 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script type="text/x-red" data-template-name="mqtt in"> + <div class="form-row"> + <label for="node-input-broker"><i class="fa fa-globe"></i> Broker</label> + <input type="text" id="node-input-broker"> + </div> + <div class="form-row"> + <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label> + <input type="text" id="node-input-topic" placeholder="Topic"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> +</script> + +<script type="text/x-red" data-help-name="mqtt in"> + <p>MQTT input node. Connects to a broker and subscribes to the specified topic. The topic may contain MQTT wildcards.</p> + <p>Outputs an object called <b>msg</b> containing <b>msg.topic, msg.payload, msg.qos</b> and <b>msg.retain</b>.</p> + <p><b>msg.payload</b> is a String.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('mqtt in',{ + category: 'input', + defaults: { + name: {value:""}, + topic: {value:"",required:true}, + broker: {type:"mqtt-broker", required:true} + }, + color:"#d8bfd8", + inputs:0, + outputs:1, + icon: "bridge.png", + label: function() { + return this.name||this.topic||"mqtt"; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> + +<script type="text/x-red" data-template-name="mqtt out"> + <div class="form-row"> + <label for="node-input-broker"><i class="fa fa-globe"></i> Broker</label> + <input type="text" id="node-input-broker"> + </div> + <div class="form-row"> + <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label> + <input type="text" id="node-input-topic" placeholder="Topic"> + </div> + <div class="form-row"> + <label for="node-input-qos"><i class="fa fa-empire"></i> QoS</label> + <select id="node-input-qos" style="width:125px !important"> + <option value=""></option> + <option value="0">0</option> + <option value="1">1</option> + <option value="2">2</option> + </select> + <i class="fa fa-history"></i> Retain <select id="node-input-retain" style="width:125px !important"> + <option value=""></option> + <option value="false">false</option> + <option value="true">true</option> + </select> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div class="form-tips">Tip: Leave topic, qos or retain blank if you want to set them via msg properties.</div> +</script> + +<script type="text/x-red" data-help-name="mqtt out"> + <p>Connects to a MQTT broker and publishes <b>msg.payload</b> either to the <b>msg.topic</b> or to the topic specified in the edit window. The value in the edit window has precedence.</p> + <p>Likewise QoS and/or retain values in the edit panel will overwrite any <b>msg.qos</b> and <b>msg.retain</b> properties. If nothing is set they default to <i>0</i> and <i>false</i> respectively.</p> + <p>If <b>msg.payload</b> contains an object it will be stringified before being sent.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('mqtt out',{ + category: 'output', + defaults: { + name: {value:""}, + topic: {value:""}, + qos: {value:""}, + retain: {value:""}, + broker: {type:"mqtt-broker", required:true} + }, + color:"#d8bfd8", + inputs:1, + outputs:0, + icon: "bridge.png", + align: "right", + label: function() { + return this.name||this.topic||"mqtt"; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> + +<script type="text/x-red" data-template-name="mqtt-broker"> + <div class="form-row node-input-broker"> + <label for="node-config-input-broker"><i class="fa fa-globe"></i> Broker</label> + <input class="input-append-left" type="text" id="node-config-input-broker" placeholder="localhost" style="width: 40%;" > + <label for="node-config-input-port" style="margin-left: 10px; width: 35px; "> Port</label> + <input type="text" id="node-config-input-port" placeholder="Port" style="width:45px"> + </div> + <div class="form-row"> + <label for="node-config-input-clientid"><i class="fa fa-tag"></i> Client ID</label> + <input type="text" id="node-config-input-clientid" placeholder="Leave blank for auto generated"> + </div> + <div class="form-row"> + <label for="node-config-input-user"><i class="fa fa-user"></i> Username</label> + <input type="text" id="node-config-input-user"> + </div> + <div class="form-row"> + <label for="node-config-input-password"><i class="fa fa-lock"></i> Password</label> + <input type="password" id="node-config-input-password"> + </div> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('mqtt-broker',{ + category: 'config', + defaults: { + broker: {value:"",required:true}, + port: {value:1883,required:true,validate:RED.validators.number()}, + clientid: { value:"" } + }, + credentials: { + user: {type:"text"}, + password: {type: "password"} + }, + label: function() { + if (this.broker == "") { this.broker = "localhost"; } + return (this.clientid?this.clientid+"@":"")+this.broker+":"+this.port; + } + }); +</script> diff --git a/dgbuilder/core_nodes/io/10-mqtt.js b/dgbuilder/core_nodes/io/10-mqtt.js new file mode 100644 index 00000000..c8bc4901 --- /dev/null +++ b/dgbuilder/core_nodes/io/10-mqtt.js @@ -0,0 +1,119 @@ +/** + * Copyright 2013,2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + var connectionPool = require("./lib/mqttConnectionPool"); + + function MQTTBrokerNode(n) { + RED.nodes.createNode(this,n); + this.broker = n.broker; + this.port = n.port; + this.clientid = n.clientid; + if (this.credentials) { + this.username = this.credentials.user; + this.password = this.credentials.password; + } + } + RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{ + credentials: { + user: {type:"text"}, + password: {type: "password"} + } + }); + + function MQTTInNode(n) { + RED.nodes.createNode(this,n); + this.topic = n.topic; + this.broker = n.broker; + this.brokerConfig = RED.nodes.getNode(this.broker); + if (this.brokerConfig) { + this.status({fill:"red",shape:"ring",text:"disconnected"}); + this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); + var node = this; + this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) { + var msg = {topic:topic,payload:payload,qos:qos,retain:retain}; + if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) { + msg._topic = topic; + } + node.send(msg); + }); + this.client.on("connectionlost",function() { + node.status({fill:"red",shape:"ring",text:"disconnected"}); + }); + this.client.on("connect",function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + this.client.connect(); + } else { + this.error("missing broker configuration"); + } + this.on('close', function() { + if (this.client) { + this.client.disconnect(); + } + }); + } + RED.nodes.registerType("mqtt in",MQTTInNode); + + function MQTTOutNode(n) { + RED.nodes.createNode(this,n); + this.topic = n.topic; + this.qos = n.qos || null; + this.retain = n.retain; + this.broker = n.broker; + this.brokerConfig = RED.nodes.getNode(this.broker); + + if (this.brokerConfig) { + this.status({fill:"red",shape:"ring",text:"disconnected"},true); + this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password); + var node = this; + this.on("input",function(msg) { + if (msg.qos) { + msg.qos = parseInt(msg.qos); + if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) { + msg.qos = null; + } + } + msg.qos = Number(node.qos || msg.qos || 0); + msg.retain = node.retain || msg.retain || false; + msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false; + if (node.topic) { + msg.topic = node.topic; + } + if ((msg.hasOwnProperty("topic")) && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist + this.client.publish(msg); // send the message + } + else { node.warn("Invalid topic specified"); } + }); + this.client.on("connectionlost",function() { + node.status({fill:"red",shape:"ring",text:"disconnected"}); + }); + this.client.on("connect",function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + this.client.connect(); + } else { + this.error("missing broker configuration"); + } + this.on('close', function() { + if (this.client) { + this.client.disconnect(); + } + }); + } + RED.nodes.registerType("mqtt out",MQTTOutNode); +} diff --git a/dgbuilder/core_nodes/io/21-httpin.html b/dgbuilder/core_nodes/io/21-httpin.html new file mode 100644 index 00000000..059b8596 --- /dev/null +++ b/dgbuilder/core_nodes/io/21-httpin.html @@ -0,0 +1,254 @@ +<!-- + Copyright 2013 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script type="text/x-red" data-template-name="http in"> + <div class="form-row"> + <label for="node-input-method"><i class="fa fa-tasks"></i> Method</label> + <select type="text" id="node-input-method" style="width:72%;"> + <option value="get">GET</option> + <option value="post">POST</option> + <option value="put">PUT</option> + <option value="delete">DELETE</option> + </select> + </div> + <div class="form-row"> + <label for="node-input-url"><i class="fa fa-globe"></i> url</label> + <input type="text" id="node-input-url" placeholder="/url"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div id="node-input-tip" class="form-tips">The url will be relative to <code><span id="node-input-path"></span></code>.</div> +</script> + +<script type="text/x-red" data-help-name="http in"> + <p>Provides an input node for http requests, allowing the creation of simple web services.</p> + <p>The resulting message has the following properties: + <ul> + <li>msg.req : <a href="http://expressjs.com/api.html#req">http request</a></li> + <li>msg.res : <a href="http://expressjs.com/api.html#res">http response</a></li> + </ul> + </p> + <p>For POST/PUT requests, the body is available under <code>msg.req.body</code>. This + uses the <a href="http://expressjs.com/api.html#bodyParser">Express bodyParser middleware</a> to parse the content to a JSON object. + </p> + <p> + By default, this expects the body of the request to be url encoded: + <pre>foo=bar&this=that</pre> + </p> + <p> + To send JSON encoded data to the node, the content-type header of the request must be set to + <code>application/json</code>. + </p> + <p> + <b>Note: </b>This node does not send any response to the http request. This should be done with + a subsequent HTTP Response node, or Function node. + In the case of a Function node, the <a href="http://expressjs.com/api.html#res">Express response documentation</a> + describes how this should be done. For example: + <pre>msg.res.send(200, 'Thanks for the request ');<br/>return msg;</pre> + </p> + +</script> + +<script type="text/x-red" data-template-name="http response"> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div class="form-tips">The messages sent to this node <b>must</b> originate from an <i>http input</i> node</div> +</script> + +<script type="text/x-red" data-help-name="http response"> + <p>Sends responses back to http requests received from an HTTP Input node.</p> + <p>The response can be customised using the following message properties:</p> + <ul> + <li><code>payload</code> is sent as the body of the response</li> + <li><code>statusCode</code>, if set, is used as the response status code (default: 200)</li> + <li><code>headers</code>, if set, should be an object containing field/value + pairs to be added as response headers.</li> + </ul> +</script> + +<script type="text/x-red" data-template-name="http request"> + <div class="form-row"> + <label for="node-input-method"><i class="fa fa-tasks"></i> Method</label> + <select type="text" id="node-input-method" style="width:72%;"> + <option value="GET">GET</option> + <option value="POST">POST</option> + <option value="PUT">PUT</option> + <option value="DELETE">DELETE</option> + </select> + </div> + <div class="form-row"> + <label for="node-input-url"><i class="fa fa-globe"></i> URL</label> + <input type="text" id="node-input-url" placeholder="http://"> + </div> + <div class="form-row"> + <label> </label> + <input type="checkbox" id="node-input-useAuth" style="display: inline-block; width: auto; vertical-align: top;"> + <label for="node-input-useAuth" style="width: 70%;">Use basic authentication?</label> + </div> + <div class="form-row node-input-useAuth-row"> + <label for="node-input-user"><i class="fa fa-user"></i> Username</label> + <input type="text" id="node-input-user"> + </div> + <div class="form-row node-input-useAuth-row"> + <label for="node-input-password"><i class="fa fa-lock"></i> Password</label> + <input type="password" id="node-input-password"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> +</script> + +<script type="text/x-red" data-help-name="http request"> + <p>Provides a node for making http requests.</p> + <p>The URL and HTTP method can be configured in the node, but also + overridden by the incoming message: + <ul> + <li><code>url</code>, if set, is used as the url of the request. Must start with http: or https:</li> + <li><code>method</code>, if set, is used as the HTTP method of the request. + Must be one of <code>GET</code>, <code>PUT</code>, <code>POST</code> or <code>DELETE</code> (default: GET)</li> + <li><code>headers</code>, if set, should be an object containing field/value + pairs to be added as request headers</li> + <li><code>payload</code> is sent as the body of the request</li> + </ul> + <p>When configured within the node, the URL property can contain <a href="http://mustache.github.io/mustache.5.html" target="_new">mustache-style</a> tags. These allow the + url to be constructed using values of the incoming message. For example, if the url is set to + <code>example.com/{{topic}}</code>, it will have the value of <code>msg.topic</code> automatically inserted.</p> + <p> + The output message contains the following properties: + <ul> + <li><code>payload</code> is the body of the response</li> + <li><code>statusCode</code> is the status code of the response, or the error code if the request could not be completed</li> + <li><code>headers</code> is an object containing the response headers</li> + </ul> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('http in',{ + category: 'input', + color:"rgb(231, 231, 174)", + defaults: { + name: {value:""}, + url: {value:"",required:true}, + method: {value:"get",required:true} + }, + inputs:0, + outputs:1, + icon: "white-globe.png", + label: function() { + if (this.name) { + return this.name; + } else if (this.url) { + var root = RED.settings.httpNodeRoot; + if (root.slice(-1) != "/") { + root = root+"/"; + } + if (this.url.charAt(0) == "/") { + root += this.url.slice(1); + } else { + root += this.url; + } + return "["+this.method+"] "+root; + } else { + return "http"; + } + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + }, + oneditprepare: function() { + var root = RED.settings.httpNodeRoot; + if (root.slice(-1) == "/") { + root = root.slice(0,-1); + } + if (root == "") { + $("#node-input-tip").hide(); + } else { + $("#node-input-path").html(root); + $("#node-input-tip").show(); + } + //document.getElementById("node-config-wsdocpath").innerHTML= + } + + }); + + RED.nodes.registerType('http response',{ + category: 'output', + color:"rgb(231, 231, 174)", + defaults: { + name: {value:""} + }, + inputs:1, + outputs:0, + align: "right", + icon: "white-globe.png", + label: function() { + return this.name||"http"; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); + + RED.nodes.registerType('http request',{ + category: 'function', + color:"rgb(231, 231, 174)", + defaults: { + name: {value:""}, + method:{value:"GET"}, + url:{value:""}, + //user -> credentials + //pass -> credentials + }, + credentials: { + user: {type:"text"}, + password: {type: "password"} + }, + inputs:1, + outputs:1, + align: "right", + icon: "white-globe.png", + label: function() { + return this.name||"http request"; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + }, + oneditprepare: function() { + if (this.credentials.user || this.credentials.has_password) { + $('#node-input-useAuth').prop('checked', true); + $(".node-input-useAuth-row").show(); + } else { + $('#node-input-useAuth').prop('checked', false); + $(".node-input-useAuth-row").hide(); + } + + $("#node-input-useAuth").change(function() { + if ($(this).is(":checked")) { + $(".node-input-useAuth-row").show(); + } else { + $(".node-input-useAuth-row").hide(); + $('#node-input-user').val(''); + $('#node-input-password').val(''); + } + }); + }, + }); +</script> diff --git a/dgbuilder/core_nodes/io/21-httpin.js b/dgbuilder/core_nodes/io/21-httpin.js new file mode 100644 index 00000000..877ccc09 --- /dev/null +++ b/dgbuilder/core_nodes/io/21-httpin.js @@ -0,0 +1,241 @@ +/** + * Copyright 2013,2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + var http = require("follow-redirects").http; + var https = require("follow-redirects").https; + var urllib = require("url"); + var express = require("express"); + var getBody = require('raw-body'); + var mustache = require("mustache"); + var querystring = require("querystring"); + + var cors = require('cors'); + var jsonParser = express.json(); + var urlencParser = express.urlencoded(); + + function rawBodyParser(req, res, next) { + if (req._body) { return next(); } + req.body = ""; + req._body = true; + getBody(req, { + limit: '1mb', + length: req.headers['content-length'], + encoding: 'utf8' + }, function (err, buf) { + if (err) { return next(err); } + req.body = buf; + next(); + }); + } + + + function HTTPIn(n) { + RED.nodes.createNode(this,n); + if (RED.settings.httpNodeRoot !== false) { + + this.url = n.url; + this.method = n.method; + + var node = this; + + this.errorHandler = function(err,req,res,next) { + node.warn(err); + res.send(500); + }; + + this.callback = function(req,res) { + if (node.method == "post") { + node.send({req:req,res:res,payload:req.body}); + } else if (node.method == "get") { + node.send({req:req,res:res,payload:req.query}); + } else { + node.send({req:req,res:res}); + } + } + + var corsHandler = function(req,res,next) { next(); } + + if (RED.settings.httpNodeCors) { + corsHandler = cors(RED.settings.httpNodeCors); + RED.httpNode.options(this.url,corsHandler); + } + + if (this.method == "get") { + RED.httpNode.get(this.url,corsHandler,this.callback,this.errorHandler); + } else if (this.method == "post") { + RED.httpNode.post(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler); + } else if (this.method == "put") { + RED.httpNode.put(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler); + } else if (this.method == "delete") { + RED.httpNode.delete(this.url,corsHandler,this.callback,this.errorHandler); + } + + this.on("close",function() { + var routes = RED.httpNode.routes[this.method]; + for (var i = 0; i<routes.length; i++) { + if (routes[i].path == this.url) { + routes.splice(i,1); + //break; + } + } + if (RED.settings.httpNodeCors) { + var route = RED.httpNode.route['options']; + for (var j = 0; j<route.length; j++) { + if (route[j].path == this.url) { + route.splice(j,1); + //break; + } + } + } + }); + } else { + this.warn("Cannot create http-in node when httpNodeRoot set to false"); + } + } + RED.nodes.registerType("http in",HTTPIn); + + + function HTTPOut(n) { + RED.nodes.createNode(this,n); + var node = this; + this.on("input",function(msg) { + if (msg.res) { + if (msg.headers) { + msg.res.set(msg.headers); + } + var statusCode = msg.statusCode || 200; + if (typeof msg.payload == "object" && !Buffer.isBuffer(msg.payload)) { + msg.res.jsonp(statusCode,msg.payload); + } else { + if (msg.res.get('content-length') == null) { + var len; + if (msg.payload == null) { + len = 0; + } else if (typeof msg.payload == "number") { + len = Buffer.byteLength(""+msg.payload); + } else { + len = Buffer.byteLength(msg.payload); + } + msg.res.set('content-length', len); + } + msg.res.send(statusCode,msg.payload); + } + } else { + node.warn("No response object"); + } + }); + } + RED.nodes.registerType("http response",HTTPOut); + + function HTTPRequest(n) { + RED.nodes.createNode(this,n); + var nodeUrl = n.url; + var isTemplatedUrl = (nodeUrl||"").indexOf("{{") != -1; + var nodeMethod = n.method || "GET"; + var node = this; + this.on("input",function(msg) { + node.status({fill:"blue",shape:"dot",text:"requesting"}); + var url; + if (msg.url) { + url = msg.url; + } else if (isTemplatedUrl) { + url = mustache.render(nodeUrl,msg); + } else { + url = nodeUrl; + } + // url must start http:// or https:// so assume http:// if not set + if (!((url.indexOf("http://")===0) || (url.indexOf("https://")===0))) { + url = "http://"+url; + } + + var method = (msg.method||nodeMethod).toUpperCase(); + //node.log(method+" : "+url); + var opts = urllib.parse(url); + opts.method = method; + opts.headers = {}; + if (msg.headers) { + for (var v in msg.headers) { + if (msg.headers.hasOwnProperty(v)) { + var name = v.toLowerCase(); + if (name !== "content-type" && name !== "content-length") { + // only normalise the known headers used later in this + // function. Otherwise leave them alone. + name = v; + } + opts.headers[name] = msg.headers[v]; + } + } + } + if (this.credentials && this.credentials.user) { + opts.auth = this.credentials.user+":"+(this.credentials.password||""); + } + var payload = null; + + if (msg.payload && (method == "POST" || method == "PUT") ) { + if (typeof msg.payload === "string" || Buffer.isBuffer(msg.payload)) { + payload = msg.payload; + } else if (typeof msg.payload == "number") { + payload = msg.payload+""; + } else { + if (opts.headers['content-type'] == 'application/x-www-form-urlencoded') { + payload = querystring.stringify(msg.payload); + } else { + payload = JSON.stringify(msg.payload); + if (opts.headers['content-type'] == null) { + opts.headers['content-type'] = "application/json"; + } + } + } + if (opts.headers['content-length'] == null) { + opts.headers['content-length'] = Buffer.byteLength(payload); + } + } + + var req = ((/^https/.test(url))?https:http).request(opts,function(res) { + res.setEncoding('utf8'); + msg.statusCode = res.statusCode; + msg.headers = res.headers; + msg.payload = ""; + res.on('data',function(chunk) { + msg.payload += chunk; + }); + res.on('end',function() { + node.send(msg); + node.status({}); + }); + }); + req.on('error',function(err) { + msg.payload = err.toString() + " : " + url; + msg.statusCode = err.code; + node.send(msg); + node.status({fill:"red",shape:"ring",text:err.code}); + }); + if (payload) { + req.write(payload); + } + req.end(); + }); + } + + RED.nodes.registerType("http request",HTTPRequest,{ + credentials: { + user: {type:"text"}, + password: {type: "password"} + } + }); +} diff --git a/dgbuilder/core_nodes/io/22-websocket.html b/dgbuilder/core_nodes/io/22-websocket.html new file mode 100644 index 00000000..ff6ed742 --- /dev/null +++ b/dgbuilder/core_nodes/io/22-websocket.html @@ -0,0 +1,163 @@ +<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- WebSocket Input Node -->
+<script type="text/x-red" data-template-name="websocket in">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-input-server">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket in">
+ <p>WebSocket input node.</p>
+ <p>By default, the data received from the WebSocket will be in <b>msg.payload</b>.
+ The listener can be configured to expect a properly formed JSON string, in which
+ case it will parse the JSON and send on the resulting object as the entire message.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket in',{
+ category: 'input',
+ defaults: {
+ name: {value:""},
+ server: {type:"websocket-listener"}
+ },
+ color:"rgb(215, 215, 160)",
+ inputs:0,
+ outputs:1,
+ icon: "white-globe.png",
+ label: function() {
+ var wsNode = RED.nodes.node(this.server);
+ return this.name||(wsNode?"[ws] "+wsNode.label():"websocket");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<!-- WebSocket out Node -->
+<script type="text/x-red" data-template-name="websocket out">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-input-server">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket out">
+ <p>WebSocket out node.</p>
+ <p>By default, <b>msg.payload</b> will be sent over the WebSocket. The listener
+ can be configured to encode the entire message object as a JSON string and send that
+ over the WebSocket.</p>
+
+ <p>If the message arriving at this node started at a WebSocket In node, the message
+ will be sent back to the client that triggered the flow. Otherwise, the message
+ will be broadcast to all connected clients.</p>
+ <p>If you want to broadcast a message that started at a WebSocket In node, you
+ should delete the <b>msg._session</b> property within the flow</p>.
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket out',{
+ category: 'output',
+ defaults: {
+ name: {value:""},
+ server: {type:"websocket-listener", required:true}
+ },
+ color:"rgb(215, 215, 160)",
+ inputs:1,
+ outputs:0,
+ icon: "white-globe.png",
+ align: "right",
+ label: function() {
+ var wsNode = RED.nodes.node(this.server);
+ return this.name||(wsNode?"[ws] "+wsNode.label():"websocket");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<!-- WebSocket Server configuration node -->
+<script type="text/x-red" data-template-name="websocket-listener">
+ <div class="form-row">
+ <label for="node-config-input-path"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-config-input-path" placeholder="/ws/example">
+ </div>
+ <div class="form-row">
+ <label for="node-config-input-wholemsg"> </label>
+ <select type="text" id="node-config-input-wholemsg" style="width: 70%;">
+ <option value="false">Send/Receive payload</option>
+ <option value="true">Send/Receive entire message</option>
+ </select>
+ </div>
+ <div class="form-tips">
+ Be default, <code>payload</code> will contain the data to be sent over, or received from a websocket.
+ The listener can be configured to send or receive the entire message object as a JSON formatted string.
+ <p id="node-config-ws-tip">This path will be relative to <code><span id="node-config-ws-path"></span></code>.</p>
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket-listener">
+ <p>This configuration node creates a WebSocket Server using the specified path</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket-listener',{
+ category: 'config',
+ defaults: {
+ path: {value:"",required:true,validate:RED.validators.regex(/^((?!\/debug\/ws).)*$/) },
+ wholemsg: {value:"false"}
+ },
+ inputs:0,
+ outputs:0,
+ label: function() {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) != "/") {
+ root = root+"/";
+ }
+ if (this.path.charAt(0) == "/") {
+ root += this.path.slice(1);
+ } else {
+ root += this.path;
+ }
+ return root;
+ },
+ oneditprepare: function() {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) == "/") {
+ root = root.slice(0,-1);
+ }
+ if (root == "") {
+ $("#node-config-ws-tip").hide();
+ } else {
+ $("#node-config-ws-path").html(root);
+ $("#node-config-ws-tip").show();
+ }
+ //document.getElementById("node-config-wsdocpath").innerHTML=
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/22-websocket.js b/dgbuilder/core_nodes/io/22-websocket.js new file mode 100644 index 00000000..72eda502 --- /dev/null +++ b/dgbuilder/core_nodes/io/22-websocket.js @@ -0,0 +1,185 @@ +/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var ws = require("ws"),
+ inspect = require("sys").inspect;
+
+ // A node red node that sets up a local websocket server
+ function WebSocketListenerNode(n) {
+ // Create a RED node
+ RED.nodes.createNode(this,n);
+
+ var node = this;
+
+ // Store local copies of the node configuration (as defined in the .html)
+ node.path = n.path;
+ node.wholemsg = (n.wholemsg === "true");
+
+ node._inputNodes = []; // collection of nodes that want to receive events
+
+ var path = RED.settings.httpNodeRoot || "/";
+ path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);
+
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Listen for 'newListener' events from RED.server
+ node._serverListeners = {};
+
+ var storeListener = function(/*String*/event,/*function*/listener){
+ if(event == "error" || event == "upgrade" || event == "listening"){
+ node._serverListeners[event] = listener;
+ }
+ }
+
+ node._clients = {};
+
+ RED.server.addListener('newListener',storeListener);
+
+ // Create a WebSocket Server
+ node.server = new ws.Server({server:RED.server,path:path});
+
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Stop listening for new listener events
+ RED.server.removeListener('newListener',storeListener);
+
+ node.server.on('connection', function(socket){
+ var id = (1+Math.random()*4294967295).toString(16);
+ node._clients[id] = socket;
+ socket.on('close',function() {
+ delete node._clients[id];
+ });
+ socket.on('message',function(data,flags){
+ node.handleEvent(id,socket,'message',data,flags);
+ });
+ socket.on('error', function(err) {
+ node.warn("An error occured on the ws connection: "+inspect(err));
+ });
+ });
+
+ node.on("close", function() {
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Remove listeners from RED.server
+ var listener = null;
+ for(var event in node._serverListeners) {
+ if (node._serverListeners.hasOwnProperty(event)) {
+ listener = node._serverListeners[event];
+ if(typeof listener === "function"){
+ RED.server.removeListener(event,listener);
+ }
+ }
+ }
+ node._serverListeners = {};
+ node.server.close();
+ node._inputNodes = [];
+ });
+ }
+ RED.nodes.registerType("websocket-listener",WebSocketListenerNode);
+
+ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){
+ this._inputNodes.push(handler);
+ }
+
+ WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
+ var msg;
+ if (this.wholemsg) {
+ try {
+ msg = JSON.parse(data);
+ }
+ catch(err) {
+ msg = { payload:data };
+ }
+ } else {
+ msg = {
+ payload:data
+ };
+ }
+ msg._session = {type:"websocket",id:id};
+
+ for (var i = 0; i < this._inputNodes.length; i++) {
+ this._inputNodes[i].send(msg);
+ }
+ }
+
+ WebSocketListenerNode.prototype.broadcast = function(data){
+ try {
+ for (var i = 0; i < this.server.clients.length; i++) {
+ this.server.clients[i].send(data);
+ }
+ }
+ catch(e) { // swallow any errors
+ this.warn("ws:"+i+" : "+e);
+ }
+ }
+
+ WebSocketListenerNode.prototype.send = function(id,data) {
+ var session = this._clients[id];
+ if (session) {
+ try {
+ session.send(data);
+ }
+ catch(e) { // swallow any errors
+ }
+ }
+ }
+
+ function WebSocketInNode(n) {
+ RED.nodes.createNode(this,n);
+ this.server = n.server;
+ var node = this;
+ this.serverConfig = RED.nodes.getNode(this.server);
+ if (this.serverConfig) {
+ this.serverConfig.registerInputNode(this);
+ } else {
+ this.error("Missing server configuration");
+ }
+ }
+ RED.nodes.registerType("websocket in",WebSocketInNode);
+
+ function WebSocketOutNode(n) {
+ RED.nodes.createNode(this,n);
+ var node = this;
+ this.server = n.server;
+ this.serverConfig = RED.nodes.getNode(this.server);
+ if (!this.serverConfig) {
+ this.error("Missing server configuration");
+ }
+ this.on("input", function(msg) {
+ var payload;
+ if (this.serverConfig.wholemsg) {
+ delete msg._session;
+ payload = JSON.stringify(msg);
+ } else {
+ if (!Buffer.isBuffer(msg.payload)) { // if it's not a buffer make sure it's a string.
+ payload = RED.util.ensureString(msg.payload);
+ }
+ else {
+ payload = msg.payload;
+ }
+ }
+ if (msg._session && msg._session.type == "websocket") {
+ node.serverConfig.send(msg._session.id,payload);
+ } else {
+ node.serverConfig.broadcast(payload,function(error){
+ if (!!error) {
+ node.warn("An error occurred while sending:" + inspect(error));
+ }
+ });
+ }
+ });
+ }
+ RED.nodes.registerType("websocket out",WebSocketOutNode);
+}
diff --git a/dgbuilder/core_nodes/io/23-watch.html b/dgbuilder/core_nodes/io/23-watch.html new file mode 100644 index 00000000..8bf22be5 --- /dev/null +++ b/dgbuilder/core_nodes/io/23-watch.html @@ -0,0 +1,57 @@ +<!-- + Copyright 2013 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script type="text/x-red" data-template-name="watch"> + <div class="form-row node-input-filename"> + <label for="node-input-files"><i class="fa fa-file"></i> File(s)</label> + <input type="text" id="node-input-files" placeholder="File(s) or Directory"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div id="node-input-tip" class="form-tips">On Windows you must use double slashes \\ in any directory names.</div> +</script> + +<script type="text/x-red" data-help-name="watch"> + <p>Watches a directory or file for any changes.</p> + <p>You can enter a list of comma separated directories or files if you like. You will need to put " around any that have spaces in.</p> + <p>On Windows you must use double slashes \\ in any directory names.</p> + <p>The full filename of the file that actually changed is put into <b>msg.payload</b>, while a stringified version of the watched criteria is returned in <b>msg.topic</b>.</p> + <p><b>msg.file</b> contains just the short filename of the file that changed.</p> + <p>Of course in Linux, <i>everything</i> could be a file and thus watched...</p> + <p><b>Note: </b>The directory or file must exist in order to be watched. If the file or directory gets deleted it may no longer be monitored even if it gets re-created.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('watch',{ + category: 'advanced-input', + defaults: { + name: {value:""}, + files: {value:"",required:true} + }, + color:"BurlyWood", + inputs:0, + outputs:1, + icon: "watch.png", + label: function() { + return this.name||this.files; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> diff --git a/dgbuilder/core_nodes/io/23-watch.js b/dgbuilder/core_nodes/io/23-watch.js new file mode 100644 index 00000000..8a17f5ac --- /dev/null +++ b/dgbuilder/core_nodes/io/23-watch.js @@ -0,0 +1,51 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + var Notify = require("fs.notify"); + var fs = require("fs"); + var sep = require("path").sep; + + function WatchNode(n) { + RED.nodes.createNode(this,n); + + this.files = n.files.split(","); + for (var f =0; f < this.files.length; f++) { + this.files[f] = this.files[f].trim(); + } + this.p = (this.files.length == 1) ? this.files[0] : JSON.stringify(this.files); + var node = this; + + var notifications = new Notify(node.files); + notifications.on('change', function (file, event, path) { + try { + if (fs.statSync(path).isDirectory()) { path = path + sep + file; } + } catch(e) { } + var msg = { payload: path, topic: node.p, file: file }; + node.send(msg); + }); + + notifications.on('error', function (error, path) { + node.warn(error); + }); + + this.close = function() { + notifications.close(); + } + } + RED.nodes.registerType("watch",WatchNode); +} diff --git a/dgbuilder/core_nodes/io/25-serial.html b/dgbuilder/core_nodes/io/25-serial.html new file mode 100644 index 00000000..225e4dc3 --- /dev/null +++ b/dgbuilder/core_nodes/io/25-serial.html @@ -0,0 +1,265 @@ +<!-- + Copyright 2013,2014 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script type="text/x-red" data-template-name="serial in"> + <div class="form-row node-input-serial"> + <label for="node-input-serial"><i class="fa fa-random"></i> Serial Port</label> + <input type="text" id="node-input-serial"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> +</script> + +<script type="text/x-red" data-help-name="serial in"> + <p>Reads data from a local serial port.</p> + <p>Can either <ul><li>wait for a "split" character (default \n). Also accepts hex notation (0x0a).</li> + <li>Wait for a timeout in milliseconds for the first character received</li> + <li>Wait to fill a fixed sized buffer</li></ul></p> + <p>It then outputs <b>msg.payload</b> as either a UTF8 ascii string or a binary Buffer object.</p> + <p>If no split character is specified, or a timeout or buffer size of 0, then a stream of single characters is sent - again either as ascii chars or size 1 binary buffers.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('serial in',{ + category: 'input', + defaults: { + name: {name:""}, + serial: {type:"serial-port",required:true} + }, + color:"BurlyWood", + inputs:0, + outputs:1, + icon: "serial.png", + label: function() { + var serialNode = RED.nodes.node(this.serial); + return this.name||(serialNode?serialNode.label().split(":")[0]:"serial"); + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> + +<script type="text/x-red" data-template-name="serial out"> + <div class="form-row node-input-serial"> + <label for="node-input-serial"><i class="fa fa-random"></i> Serial Port</label> + <input type="text" id="node-input-serial"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> +</script> + +<script type="text/x-red" data-help-name="serial out"> + <p>Provides a connection to an outbound serial port.</p> + <p>Only the <b>msg.payload</b> is sent.</p> + <p>Optionally the new line character used to split the input can be appended to every message sent out to the serial port.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('serial out',{ + category: 'output', + defaults: { + name: {name:""}, + serial: {type:"serial-port",required:true} + }, + color:"BurlyWood", + inputs:1, + outputs:0, + icon: "serial.png", + align: "right", + label: function() { + var serialNode = RED.nodes.node(this.serial); + return this.name||(serialNode?serialNode.label().split(":")[0]:"serial"); + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> + + +<script type="text/x-red" data-template-name="serial-port"> + <div class="form-row"> + <label for="node-config-input-serialport"><i class="fa fa-random"></i> Serial Port</label> + <input type="text" id="node-config-input-serialport" style="width:60%;" placeholder="/dev/ttyUSB0"/> + <a id="node-config-lookup-serial" class="btn"><i id="node-config-lookup-serial-icon" class="fa fa-search"></i></a> + </div> + <div class="form-row"> + <table><tr> + <td width = "102px"><i class="fa fa-wrench"></i> Settings</td> + <td width = "100px">Baud Rate</td> + <td width = "80px">Data Bits</td> + <td width = "80px">Parity</td> + <td width = "80px">Stop Bits</td> + </tr><tr><td> </td> + <td> + <select type="text" id="node-config-input-serialbaud" style="width: 100px;"> + <option value="115200">115200</option> + <option value="57600">57600</option> + <option value="38400">38400</option> + <option value="19200">19200</option> + <option value="9600">9600</option> + <option value="4800">4800</option> + <option value="2400">2400</option> + <option value="1800">1800</option> + <option value="1200">1200</option> + <option value="600">600</option> + <option value="300">300</option> + <option value="200">200</option> + <option value="150">150</option> + <option value="134">134</option> + <option value="110">110</option> + <option value="75">75</option> + <option value="50">50</option> + </select> + </td><td> + <select type="text" id="node-config-input-databits" style="width: 80px;"> + <option value="8">8</option> + <option value="7">7</option> + <option value="6">6</option> + <option value="5">5</option> + </select> + </td><td> + <select type="text" id="node-config-input-parity" style="width: 80px;"> + <option value="none">None</option> + <option value="even">Even</option> + <option value="mark">Mark</option> + <option value="odd">Odd</option> + <option value="space">Space</option> + </select> + </td><td> + <select type="text" id="node-config-input-stopbits" style="width: 80px;"> + <option value="2">2</option> + <option value="1">1</option> + </select> + </td> + </tr></table><br/> + + <div class="form-row"> + <label for="node-config-input-out"><i class="fa fa-cut"></i> Split input</label> + <select type="text" id="node-config-input-out" style="width:52%;"> + <option value="char">when character received is</option> + <option value="time">after a fixed timeout of</option> + <option value="count">a fixed number of characters</option> + </select> + <input type="text" id="node-config-input-newline" style="width:50px;"> + <span id="node-units"></span> + </div> + + <div class="form-row"> + <label for="node-config-input-bin"><i class="fa fa-sign-in"></i> and deliver</label> + <select type="text" id="node-config-input-bin" style="width: 77%;"> + <option value="false">ascii strings</option> + <option value="bin">binary buffers</option> + </select> + </div> + <br/> + <div class="form-row" id="node-config-addchar"> + <label for="node-config-input-addchar"><i class="fa fa-sign-out"></i> On output</label> + <select type="text" id="node-config-input-addchar" style="width: 77%;"> + <option value="false">don't add 'split' character to output messages</option> + <option value="true">add 'split' character to output messages</option> + </select> + </div> + <div class="form-tips" id="tip-split">Tip: the "Split on" character is used to split the input into separate messages. It can also be added to every message sent out to the serial port.</div> + <div class="form-tips" id="tip-bin" hidden>Tip: In timeout mode timeout starts from arrival of first character.</div> + <script> + var previous = null; + $("#node-config-input-out").on('focus', function () { previous = this.value; }).change(function() { + if (previous == null) { previous = $("#node-config-input-out").val(); } + if ($("#node-config-input-out").val() == "char") { + if (previous != "char") { $("#node-config-input-newline").val("\\n"); } + $("#node-units").text(""); + $("#node-config-addchar").show(); + $("#tip-split").show(); + $("#tip-bin").hide(); + } + else if ($("#node-config-input-out").val() == "time") { + if (previous != "time") { $("#node-config-input-newline").val("0"); } + $("#node-units").text("ms"); + $("#node-config-addchar").hide(); + $("#node-config-input-addchar").val("false"); + $("#tip-split").hide(); + $("#tip-bin").show(); + } + else { + if (previous != "count") { $("#node-config-input-newline").val("12"); } + $("#node-units").text("chars"); + $("#node-config-addchar").hide(); + $("#node-config-input-addchar").val("false"); + $("#tip-split").hide(); + $("#tip-bin").hide(); + } + }); + + </script> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('serial-port',{ + category: 'config', + defaults: { + //name: {value:""}, + serialport: {value:"",required:true}, + serialbaud: {value:57600,required:true}, + databits: {value:8,required:true}, + parity: {value:"none",required:true}, + stopbits: {value:1,required:true}, + newline: {value:"\\n"}, + bin: {value:""}, + out: {value:""}, + addchar: {value:false} + }, + label: function() { + this.serialbaud = this.serialbaud || 57600; + this.databits = this.databits || 8; + this.parity = this.parity || 'none'; + this.stopbits = this.stopbits || 1; + return this.serialport+":"+this.serialbaud+"-"+this.databits+this.parity.charAt(0).toUpperCase()+this.stopbits; + }, + oneditprepare: function() { + try { + $("#node-config-input-serialport").autocomplete( "destroy" ); + } catch(err) { + } + $("#node-config-lookup-serial").click(function() { + //$("#node-config-lookup-serial-icon").removeClass('fa fa-search'); + //$("#node-config-lookup-serial-icon").addClass('fa fa-spinner'); + $("#node-config-lookup-serial").addClass('disabled'); + $.getJSON('serialports',function(data) { + //$("#node-config-lookup-serial-icon").addClass('fa fa-search'); + //$("#node-config-lookup-serial-icon").removeClass('fa fa-spinner'); + $("#node-config-lookup-serial").removeClass('disabled'); + var ports = []; + $.each(data, function(i, port){ + ports.push(port.comName); + }); + $("#node-config-input-serialport").autocomplete({ + source:ports, + minLength:0, + close: function( event, ui ) { + $("#node-config-input-serialport").autocomplete( "destroy" ); + } + }).autocomplete("search",""); + }); + }); + } + }); +</script> diff --git a/dgbuilder/core_nodes/io/25-serial.js b/dgbuilder/core_nodes/io/25-serial.js new file mode 100644 index 00000000..96e4aca6 --- /dev/null +++ b/dgbuilder/core_nodes/io/25-serial.js @@ -0,0 +1,310 @@ +/** +* Copyright 2013,2014 IBM Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +**/ + +module.exports = function(RED) { + "use strict"; + var settings = RED.settings; + var events = require("events"); + var util = require("util"); + var serialp = require("serialport"); + var bufMaxSize = 32768; // Max serial buffer size, for inputs... + + // TODO: 'serialPool' should be encapsulated in SerialPortNode + + function SerialPortNode(n) { + RED.nodes.createNode(this,n); + this.serialport = n.serialport; + this.newline = n.newline; + this.addchar = n.addchar || "false"; + this.serialbaud = parseInt(n.serialbaud) || 57600; + this.databits = parseInt(n.databits) || 8; + this.parity = n.parity || "none"; + this.stopbits = parseInt(n.stopbits) || 1; + this.bin = n.bin || "false"; + this.out = n.out || "char"; + } + RED.nodes.registerType("serial-port",SerialPortNode); + + function SerialOutNode(n) { + RED.nodes.createNode(this,n); + this.serial = n.serial; + this.serialConfig = RED.nodes.getNode(this.serial); + + if (this.serialConfig) { + var node = this; + node.port = serialPool.get(this.serialConfig.serialport, + this.serialConfig.serialbaud, + this.serialConfig.databits, + this.serialConfig.parity, + this.serialConfig.stopbits, + this.serialConfig.newline); + node.addCh = ""; + if (node.serialConfig.addchar == "true") { + node.addCh = this.serialConfig.newline.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); + } + node.on("input",function(msg) { + var payload = msg.payload; + if (!Buffer.isBuffer(payload)) { + if (typeof payload === "object") { + payload = JSON.stringify(payload); + } else { + payload = payload.toString(); + } + payload += node.addCh; + } else if (node.addCh !== "") { + payload = Buffer.concat([payload,new Buffer(node.addCh)]); + } + node.port.write(payload,function(err,res) { + if (err) { + node.error(err); + } + }); + }); + node.port.on('ready', function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + node.port.on('closed', function() { + node.status({fill:"red",shape:"ring",text:"not connected"}); + }); + } else { + this.error("missing serial config"); + } + + this.on("close", function(done) { + if (this.serialConfig) { + serialPool.close(this.serialConfig.serialport,done); + } else { + done(); + } + }); + } + RED.nodes.registerType("serial out",SerialOutNode); + + + function SerialInNode(n) { + RED.nodes.createNode(this,n); + this.serial = n.serial; + this.serialConfig = RED.nodes.getNode(this.serial); + + if (this.serialConfig) { + var node = this; + node.tout = null; + var buf; + if (node.serialConfig.out != "count") { buf = new Buffer(bufMaxSize); } + else { buf = new Buffer(Number(node.serialConfig.newline)); } + var i = 0; + node.status({fill:"grey",shape:"dot",text:"unknown"}); + node.port = serialPool.get(this.serialConfig.serialport, + this.serialConfig.serialbaud, + this.serialConfig.databits, + this.serialConfig.parity, + this.serialConfig.stopbits, + this.serialConfig.newline + ); + + var splitc; + if (node.serialConfig.newline.substr(0,2) == "0x") { + splitc = new Buffer([parseInt(node.serialConfig.newline)]); + } else { + splitc = new Buffer(node.serialConfig.newline.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0")); + } + + this.port.on('data', function(msg) { + // single char buffer + if ((node.serialConfig.newline === 0)||(node.serialConfig.newline === "")) { + if (node.serialConfig.bin !== "bin") { node.send({"payload": String.fromCharCode(msg)}); } + else { node.send({"payload": new Buffer([msg])}); } + } + else { + // do the timer thing + if (node.serialConfig.out === "time") { + if (node.tout) { + i += 1; + buf[i] = msg; + } + else { + node.tout = setTimeout(function () { + node.tout = null; + var m = new Buffer(i+1); + buf.copy(m,0,0,i+1); + if (node.serialConfig.bin !== "bin") { m = m.toString(); } + node.send({"payload": m}); + m = null; + }, node.serialConfig.newline); + i = 0; + buf[0] = msg; + } + } + // count bytes into a buffer... + else if (node.serialConfig.out === "count") { + buf[i] = msg; + i += 1; + if ( i >= parseInt(node.serialConfig.newline)) { + var m = new Buffer(i); + buf.copy(m,0,0,i); + if (node.serialConfig.bin !== "bin") { m = m.toString(); } + node.send({"payload":m}); + m = null; + i = 0; + } + } + // look to match char... + else if (node.serialConfig.out === "char") { + buf[i] = msg; + i += 1; + if ((msg === splitc[0]) || (i === bufMaxSize)) { + var m = new Buffer(i); + buf.copy(m,0,0,i); + if (node.serialConfig.bin !== "bin") { m = m.toString(); } + node.send({"payload":m}); + m = null; + i = 0; + } + } + else { console.log("Should never get here"); } + } + }); + this.port.on('ready', function() { + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + this.port.on('closed', function() { + node.status({fill:"red",shape:"ring",text:"not connected"}); + }); + } else { + this.error("missing serial config"); + } + + this.on("close", function(done) { + if (this.serialConfig) { + serialPool.close(this.serialConfig.serialport,done); + } else { + done(); + } + }); + } + RED.nodes.registerType("serial in",SerialInNode); + + + var serialPool = function() { + var connections = {}; + return { + get:function(port,baud,databits,parity,stopbits,newline,callback) { + var id = port; + if (!connections[id]) { + connections[id] = function() { + var obj = { + _emitter: new events.EventEmitter(), + serial: null, + _closing: false, + tout: null, + on: function(a,b) { this._emitter.on(a,b); }, + close: function(cb) { this.serial.close(cb); }, + write: function(m,cb) { this.serial.write(m,cb); }, + } + //newline = newline.replace("\\n","\n").replace("\\r","\r"); + var setupSerial = function() { + //if (newline == "") { + obj.serial = new serialp.SerialPort(port,{ + baudrate: baud, + databits: databits, + parity: parity, + stopbits: stopbits, + parser: serialp.parsers.raw + },true, function(err, results) { if (err) { obj.serial.emit('error',err); } }); + //} + //else { + // obj.serial = new serialp.SerialPort(port,{ + // baudrate: baud, + // databits: databits, + // parity: parity, + // stopbits: stopbits, + // parser: serialp.parsers.readline(newline) + // },true, function(err, results) { if (err) obj.serial.emit('error',err); }); + //} + obj.serial.on('error', function(err) { + util.log("[serial] serial port "+port+" error "+err); + obj._emitter.emit('closed'); + obj.tout = setTimeout(function() { + setupSerial(); + }, settings.serialReconnectTime); + }); + obj.serial.on('close', function() { + if (!obj._closing) { + util.log("[serial] serial port "+port+" closed unexpectedly"); + obj._emitter.emit('closed'); + obj.tout = setTimeout(function() { + setupSerial(); + }, settings.serialReconnectTime); + } + }); + obj.serial.on('open',function() { + util.log("[serial] serial port "+port+" opened at "+baud+" baud "+databits+""+parity.charAt(0).toUpperCase()+stopbits); + if (obj.tout) { clearTimeout(obj.tout); } + //obj.serial.flush(); + obj._emitter.emit('ready'); + }); + obj.serial.on('data',function(d) { + //console.log(Buffer.isBuffer(d),d.length,d); + //if (typeof d !== "string") { + // //d = d.toString(); + for (var z=0; z<d.length; z++) { + obj._emitter.emit('data',d[z]); + } + //} + //else { + // obj._emitter.emit('data',d); + //} + }); + obj.serial.on("disconnect",function() { + util.log("[serial] serial port "+port+" gone away"); + }); + } + setupSerial(); + return obj; + }(); + } + return connections[id]; + }, + close: function(port,done) { + if (connections[port]) { + if (connections[port].tout != null) { + clearTimeout(connections[port].tout); + } + connections[port]._closing = true; + try { + connections[port].close(function() { + util.log("[serial] serial port closed"); + done(); + }); + } + catch(err) { } + delete connections[port]; + } else { + done(); + } + } + } + }(); + + RED.httpAdmin.get("/serialports",function(req,res) { + serialp.list(function (err, ports) { + //console.log(JSON.stringify(ports)); + res.writeHead(200, {'Content-Type': 'text/plain'}); + res.write(JSON.stringify(ports)); + res.end(); + }); + }); +} diff --git a/dgbuilder/core_nodes/io/31-tcpin.html b/dgbuilder/core_nodes/io/31-tcpin.html new file mode 100644 index 00000000..c8cec599 --- /dev/null +++ b/dgbuilder/core_nodes/io/31-tcpin.html @@ -0,0 +1,299 @@ +<!-- + Copyright 2013 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<script type="text/x-red" data-template-name="tcp in"> + <div class="form-row"> + <label for="node-input-server"><i class="fa fa-dot-circle-o"></i> Type</label> + <select id="node-input-server" style="width:120px; margin-right:5px;"> + <option value="server">Listen on</option> + <option value="client">Connect to</option> + </select> + port <input type="text" id="node-input-port" style="width: 50px"> + </div> + <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;"> + at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 60%;"> + </div> + + <div class="form-row"> + <label><i class="fa fa-sign-out"></i> Output</label> + a + <select id="node-input-datamode" style="width:110px;"> + <option value="stream">stream of</option> + <option value="single">single</option> + </select> + <select id="node-input-datatype" style="width:140px;"> + <option value="buffer">Buffer</option> + <option value="utf8">String</option> + <option value="base64">Base64 String</option> + </select> + payload<span id="node-input-datamode-plural">s</span> + </div> + + <div id="node-row-newline" class="form-row hidden" style="padding-left: 110px;"> + delimited by <input type="text" id="node-input-newline" style="width: 110px;"> + </div> + + <div class="form-row"> + <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label> + <input type="text" id="node-input-topic" placeholder="Topic"> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> +</script> + +<script type="text/x-red" data-help-name="tcp in"> + <p>Provides a choice of tcp inputs. Can either connect to a remote tcp port, + or accept incoming connections.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('tcp in',{ + category: 'input', + color:"Silver", + defaults: { + server: {value:"server",required:true}, + host: {value:"",validate:function(v) { return (this.server == "server")||v.length > 0;} }, + port: {value:"",required:true,validate:RED.validators.number()}, + datamode:{value:"stream"}, + datatype:{value:"buffer"}, + newline:{value:""}, + topic: {value:""}, + name: {value:""}, + base64: {/*deprecated*/ value:false,required:true} + }, + inputs:0, + outputs:1, + icon: "bridge-dash.png", + label: function() { + return this.name || "tcp:"+(this.host?this.host+":":"")+this.port; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + }, + oneditprepare: function() { + var updateOptions = function() { + var sockettype = $("#node-input-server option:selected").val(); + if (sockettype == "client") { + $("#node-input-host-row").show(); + } else { + $("#node-input-host-row").hide(); + } + var datamode = $("#node-input-datamode option:selected").val(); + var datatype = $("#node-input-datatype option:selected").val(); + if (datamode == "stream") { + $("#node-input-datamode-plural").show(); + if (datatype == "utf8") { + $("#node-row-newline").show(); + } else { + $("#node-row-newline").hide(); + } + } else { + $("#node-input-datamode-plural").hide(); + $("#node-row-newline").hide(); + } + }; + updateOptions(); + $("#node-input-server").change(updateOptions); + $("#node-input-datatype").change(updateOptions); + $("#node-input-datamode").change(updateOptions); + } + }); +</script> + + +<script type="text/x-red" data-template-name="tcp out"> + <div class="form-row"> + <label for="node-input-beserver"><i class="fa fa-dot-circle-o"></i> Type</label> + <select id="node-input-beserver" style="width:150px; margin-right:5px;"> + <option value="server">Listen on</option> + <option value="client">Connect to</option> + <option value="reply">Reply to TCP</option> + </select> + <span id="node-input-port-row">port <input type="text" id="node-input-port" style="width: 50px"></span> + </div> + + <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;"> + at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 60%;"> + </div> + + <div class="form-row hidden" id="node-input-end-row"> + <label> </label> + <input type="checkbox" id="node-input-end" style="display: inline-block; width: auto; vertical-align: top;"> + <label for="node-input-end" style="width: 70%;">Close connection after each message is sent ?</label> + </div> + + <div class="form-row"> + <label> </label> + <input type="checkbox" id="node-input-base64" placeholder="base64" style="display: inline-block; width: auto; vertical-align: top;"> + <label for="node-input-base64" style="width: 70%;">Decode Base64 message ?</label> + </div> + + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + + <div class="form-tips hidden" id="fin-tip"> + <b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example. + </div> + <div class="form-tips hidden" id="fin-tip2"> + <b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example. The receiving client will need to reconnect. + </div> +</script> + +<script type="text/x-red" data-help-name="tcp out"> + <p>Provides a choice of tcp outputs. Can either connect to a remote tcp port, + accept incoming connections, or reply to messages received from a TCP In node.</p> + <p>Only <b>msg.payload</b> is sent.</p> + <p>If <b>msg.payload</b> is a string containing a Base64 encoding of binary + data, the Base64 decoding option will cause it to be converted back to binary + before being sent.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('tcp out',{ + category: 'output', + color:"Silver", + defaults: { + host: {value:"",validate:function(v) { return (this.beserver != "client")||v.length > 0;} }, + port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } }, + beserver: {value:"client",required:true}, + base64: {value:false,required:true}, + end: {value:false,required:true}, + name: {value:""} + }, + inputs:1, + outputs:0, + icon: "bridge-dash.png", + align: "right", + label: function() { + return this.name || "tcp:"+(this.host?this.host+":":"")+this.port; + }, + labelStyle: function() { + return (this.name)?"node_label_italic":""; + }, + oneditprepare: function() { + var updateOptions = function() { + var sockettype = $("#node-input-beserver option:selected").val(); + if (sockettype == "reply") { + $("#node-input-port-row").hide(); + $("#node-input-host-row").hide(); + $("#node-input-end-row").hide(); + } else { + $("#node-input-port-row").show(); + $("#node-input-end-row").show(); + } + + if (sockettype == "client") { + $("#node-input-host-row").show(); + $("#fin-tip").show(); + } else { + $("#node-input-host-row").hide(); + $("#fin-tip").hide(); + } + + if (sockettype == "server") { + $("#fin-tip2").show(); + } + else { + $("#fin-tip2").hide(); + } + + }; + updateOptions(); + $("#node-input-beserver").change(updateOptions); + } + }); +</script> + + +<script type="text/x-red" data-template-name="tcp request"> + <div class="form-row"> + <label for="node-input-server"><i class="fa fa-globe"></i> Server</label> + <input type="text" id="node-input-server" placeholder="ip.address" style="width:50%"> + port <input type="text" id="node-input-port" style="width:50px"> + </div> + <div class="form-row"> + <label for="node-input-out"><i class="fa fa-sign-out"></i> Return</label> + <select type="text" id="node-input-out" style="width:52%;"> + <option value="time">after a fixed timeout of</option> + <option value="char">when character received is</option> + <option value="count">a fixed number of characters</option> + <option value="sit">never. Keep connection open</option> + </select> + <input type="text" id="node-input-splitc" style="width:50px;"> + <span id="node-units"></span> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div class="form-tips"><b>Tip:</b> outputs a binary <b>Buffer</b>, so you may want to .toString() it.</div> + <script> + var previous = null; + $("#node-input-out").on('focus', function () { previous = this.value; }).change(function() { + if (previous == null) { previous = $("#node-input-out").val(); } + if ($("#node-input-out").val() == "char") { + if (previous != "char") $("#node-input-splitc").val("\\n"); + $("#node-units").text(""); + } + else if ($("#node-input-out").val() == "time") { + if (previous != "time") $("#node-input-splitc").val("0"); + $("#node-units").text("ms"); + } + else if ($("#node-input-out").val() == "count") { + if (previous != "count") $("#node-input-splitc").val("12"); + $("#node-units").text("chars"); + } + else { + if (previous != "sit") $("#node-input-splitc").val("0"); + $("#node-units").text(""); + } + }); +</script> + +<script type="text/x-red" data-help-name="tcp request"> + <p>A simple TCP request node - sends the <b>msg.payload</b> to a server tcp port and expects a response.</p> + <p>Connects, sends the "request", reads the "response". It can either count a number of + returned characters into a fixed buffer, match a specified character before returning, + wait a fixed timeout from first reply and then return, or just sit and wait for data.</p> + <p>The response will be output in <b>msg.payload</b> as a buffer, so you may want to .toString() it.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('tcp request',{ + category: 'function', + color:"Silver", + defaults: { + server: {value:"",required:true}, + port: {value:"",required:true,validate:RED.validators.number()}, + out: {value:"time",required:true}, + splitc: {value:"0",required:true}, + name: {value:""} + }, + inputs:1, + outputs:1, + icon: "bridge-dash.png", + label: function() { + return this.name || "tcp:"+(this.server?this.server+":":"")+this.port; + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> diff --git a/dgbuilder/core_nodes/io/31-tcpin.js b/dgbuilder/core_nodes/io/31-tcpin.js new file mode 100644 index 00000000..2e4e5e7b --- /dev/null +++ b/dgbuilder/core_nodes/io/31-tcpin.js @@ -0,0 +1,472 @@ +/** + * Copyright 2013,2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + var reconnectTime = RED.settings.socketReconnectTime||10000; + var socketTimeout = RED.settings.socketTimeout||null; + var net = require('net'); + + var connectionPool = {}; + + function TcpIn(n) { + RED.nodes.createNode(this,n); + this.host = n.host; + this.port = n.port * 1; + this.topic = n.topic; + this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/ + this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */ + this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r"); + this.base64 = n.base64; + this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server"); + this.closing = false; + var node = this; + var count = 0; + + if (!node.server) { + var buffer = null; + var client; + var reconnectTimeout; + var end = false; + var setupTcpClient = function() { + node.log("connecting to "+node.host+":"+node.port); + node.status({fill:"grey",shape:"dot",text:"connecting"}); + var id = (1+Math.random()*4294967295).toString(16); + client = net.connect(node.port, node.host, function() { + buffer = (node.datatype == 'buffer')? new Buffer(0):""; + node.log("connected to "+node.host+":"+node.port); + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + connectionPool[id] = client; + + client.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + if (node.stream) { + if ((node.datatype) === "utf8" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i<parts.length-1;i+=1) { + var msg = {topic:node.topic, payload:parts[i]}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } + buffer = parts[parts.length-1]; + } else { + var msg = {topic:node.topic, payload:data}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } + } else { + if ((typeof data) === "string") { + buffer = buffer+data; + } else { + buffer = Buffer.concat([buffer,data],buffer.length+data.length); + } + } + }); + client.on('end', function() { + if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) { + var msg = {topic:node.topic,payload:buffer}; + msg._session = {type:"tcp",id:id}; + if (buffer.length !== 0) { + end = true; // only ask for fast re-connect if we actually got something + node.send(msg); + } + buffer = null; + } + }); + client.on('close', function() { + delete connectionPool[id]; + node.status({fill:"red",shape:"ring",text:"disconnected"}); + if (!node.closing) { + if (end) { // if we were asked to close then try to reconnect once very quick. + end = false; + reconnectTimeout = setTimeout(setupTcpClient, 20); + } + else { + node.log("connection lost to "+node.host+":"+node.port); + reconnectTimeout = setTimeout(setupTcpClient, reconnectTime); + } + } + }); + client.on('error', function(err) { + node.log(err); + }); + } + setupTcpClient(); + + this.on('close', function() { + this.closing = true; + client.end(); + clearTimeout(reconnectTimeout); + }); + } else { + var server = net.createServer(function (socket) { + if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } + var id = (1+Math.random()*4294967295).toString(16); + connectionPool[id] = socket; + node.status({text:++count+" connections"}); + + var buffer = (node.datatype == 'buffer')? new Buffer(0):""; + socket.on('data', function (data) { + if (node.datatype != 'buffer') { + data = data.toString(node.datatype); + } + if (node.stream) { + if ((typeof data) === "string" && node.newline != "") { + buffer = buffer+data; + var parts = buffer.split(node.newline); + for (var i = 0;i<parts.length-1;i+=1) { + var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } + buffer = parts[parts.length-1]; + } else { + var msg = {topic:node.topic, payload:data}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } + } else { + if ((typeof data) === "string") { + buffer = buffer+data; + } else { + buffer = Buffer.concat([buffer,data],buffer.length+data.length); + } + } + }); + socket.on('end', function() { + if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) { + if (buffer.length > 0) { + var msg = {topic:node.topic,payload:buffer}; + msg._session = {type:"tcp",id:id}; + node.send(msg); + } + buffer = null; + } + }); + socket.on('timeout', function() { + node.log('timeout closed socket port '+node.port); + socket.end(); + }); + socket.on('close', function() { + delete connectionPool[id]; + node.status({text:--count+" connections"}); + }); + socket.on('error',function(err) { + node.log(err); + }); + }); + server.on('error', function(err) { + if (err) { + node.error('unable to listen on port '+node.port+' : '+err); + } + }); + server.listen(node.port, function(err) { + if (err) { + node.error('unable to listen on port '+node.port+' : '+err); + } else { + node.log('listening on port '+node.port); + + node.on('close', function() { + node.closing = true; + server.close(); + node.log('stopped listening on port '+node.port); + }); + } + }); + } + + } + RED.nodes.registerType("tcp in",TcpIn); + + function TcpOut(n) { + RED.nodes.createNode(this,n); + this.host = n.host; + this.port = n.port * 1; + this.base64 = n.base64; + this.doend = n.end || false; + this.beserver = n.beserver; + this.name = n.name; + this.closing = false; + var node = this; + + if (!node.beserver||node.beserver=="client") { + var reconnectTimeout; + var client = null; + var connected = false; + var end = false; + + var setupTcpClient = function() { + node.log("connecting to "+node.host+":"+node.port); + node.status({fill:"grey",shape:"dot",text:"connecting"}); + client = net.connect(node.port, node.host, function() { + connected = true; + node.log("connected to "+node.host+":"+node.port); + node.status({fill:"green",shape:"dot",text:"connected"}); + }); + client.on('error', function (err) { + node.log('error : '+err); + }); + client.on('end', function (err) { + }); + client.on('close', function() { + node.status({fill:"red",shape:"ring",text:"disconnected"}); + connected = false; + client.destroy(); + if (!node.closing) { + if (end) { + end = false; + reconnectTimeout = setTimeout(setupTcpClient,20); + } + else { + node.log("connection lost to "+node.host+":"+node.port); + reconnectTimeout = setTimeout(setupTcpClient,reconnectTime); + } + } + }); + } + setupTcpClient(); + + node.on("input", function(msg) { + if (connected && msg.payload != null) { + if (Buffer.isBuffer(msg.payload)) { + client.write(msg.payload); + } else if (typeof msg.payload === "string" && node.base64) { + client.write(new Buffer(msg.payload,'base64')); + } else { + client.write(new Buffer(""+msg.payload)); + } + if (node.doend === true) { + end = true; + client.end(); + } + } + }); + + node.on("close", function() { + this.closing = true; + client.end(); + clearTimeout(reconnectTimeout); + }); + + } else if (node.beserver == "reply") { + node.on("input",function(msg) { + if (msg._session && msg._session.type == "tcp") { + var client = connectionPool[msg._session.id]; + if (client) { + if (Buffer.isBuffer(msg.payload)) { + client.write(msg.payload); + } else if (typeof msg.payload === "string" && node.base64) { + client.write(new Buffer(msg.payload,'base64')); + } else { + client.write(new Buffer(""+msg.payload)); + } + } + } + }); + } else { + var connectedSockets = []; + node.status({text:"0 connections"}); + var server = net.createServer(function (socket) { + if (socketTimeout !== null) { socket.setTimeout(socketTimeout); } + var remoteDetails = socket.remoteAddress+":"+socket.remotePort; + node.log("connection from "+remoteDetails); + connectedSockets.push(socket); + node.status({text:connectedSockets.length+" connections"}); + socket.on('timeout', function() { + node.log('timeout closed socket port '+node.port); + socket.end(); + }); + socket.on('close',function() { + node.log("connection closed from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + node.status({text:connectedSockets.length+" connections"}); + }); + socket.on('error',function() { + node.log("socket error from "+remoteDetails); + connectedSockets.splice(connectedSockets.indexOf(socket),1); + node.status({text:connectedSockets.length+" connections"}); + }); + }); + + node.on("input", function(msg) { + if (msg.payload != null) { + var buffer; + if (Buffer.isBuffer(msg.payload)) { + buffer = msg.payload; + } else if (typeof msg.payload === "string" && node.base64) { + buffer = new Buffer(msg.payload,'base64'); + } else { + buffer = new Buffer(""+msg.payload); + } + for (var i = 0; i<connectedSockets.length;i+=1) { + if (node.doend === true) { connectedSockets[i].end(buffer); } + else { connectedSockets[i].write(buffer); } + } + } + }); + + server.on('error', function(err) { + if (err) { + node.error('unable to listen on port '+node.port+' : '+err); + } + }); + + server.listen(node.port, function(err) { + if (err) { + node.error('unable to listen on port '+node.port+' : '+err); + } else { + node.log('listening on port '+node.port); + node.on('close', function() { + server.close(); + node.log('stopped listening on port '+node.port); + }); + } + }); + } + } + RED.nodes.registerType("tcp out",TcpOut); + + function TcpGet(n) { + RED.nodes.createNode(this,n); + this.server = n.server; + this.port = Number(n.port); + this.out = n.out; + this.splitc = n.splitc; + + if (this.out != "char") { this.splitc = Number(this.splitc); } + else { this.splitc.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); } + + var buf; + if (this.out == "count") { buf = new Buffer(this.splitc); } + else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully + + this.connected = false; + var node = this; + var client; + + this.on("input", function(msg) { + var i = 0; + if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) { + msg.payload = msg.payload.toString(); + } + if (!node.connected) { + client = net.Socket(); + client.setTimeout(socketTimeout); + node.status({}); + client.connect(node.port, node.server, function() { + //node.log('client connected'); + node.status({fill:"green",shape:"dot",text:"connected"}); + node.connected = true; + client.write(msg.payload); + }); + + client.on('data', function(data) { + //node.log("data:"+ data.length+":"+ data); + if (node.splitc === 0) { + node.send({"payload": data}); + } + else if (node.out === "sit") { // if we are staying connected just send the buffer + node.send({"payload": data}); + } + else { + for (var j = 0; j < data.length; j++ ) { + if (node.out === "time") { + // do the timer thing + if (node.tout) { + i += 1; + buf[i] = data[j]; + } + else { + node.tout = setTimeout(function () { + node.tout = null; + var m = new Buffer(i+1); + buf.copy(m,0,0,i+1); + node.send({"payload": m}); + client.end(); + m = null; + }, node.splitc); + i = 0; + buf[0] = data[j]; + } + } + // count bytes into a buffer... + else if (node.out == "count") { + buf[i] = data[j]; + i += 1; + if ( i >= node.serialConfig.count) { + node.send({"payload": buf}); + client.end(); + i = 0; + } + } + // look for a char + else { + buf[i] = data[j]; + i += 1; + if (data[j] == node.splitc) { + var m = new Buffer(i); + buf.copy(m,0,0,i); + node.send({"payload": m}); + client.end(); + m = null; + i = 0; + } + } + } + } + }); + + client.on('end', function() { + //node.log('client disconnected'); + node.connected = false; + node.status({}); + client = null; + }); + + client.on('error', function() { + node.log('connect failed'); + node.status({fill:"red",shape:"ring",text:"error"}); + if (client) { client.end(); } + }); + + client.on('timeout',function() { + node.log('connect timeout'); + if (client) { + client.end(); + setTimeout(function() { + client.connect(node.port, node.server, function() { + //node.log('client connected'); + node.connected = true; + client.write(msg.payload); + }); + },reconnectTime); + } + }); + } + else { client.write(msg.payload); } + }); + + this.on("close", function() { + if (client) { buf = null; client.end(); } + }); + + } + RED.nodes.registerType("tcp request",TcpGet); +} diff --git a/dgbuilder/core_nodes/io/32-udp.html b/dgbuilder/core_nodes/io/32-udp.html new file mode 100644 index 00000000..1c2eed57 --- /dev/null +++ b/dgbuilder/core_nodes/io/32-udp.html @@ -0,0 +1,212 @@ +<!-- + Copyright 2013 IBM Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- The Input Node --> +<script type="text/x-red" data-template-name="udp in"> + <div class="form-row"> + <label for="node-input-port"><i class="fa fa-sign-in"></i> Listen</label> + on port <input type="text" id="node-input-port" placeholder="Port" style="width: 45px"> + for <select id="node-input-multicast" style='width:40%'> + <option value="false">udp messages</option> + <option value="true">multicast messages</option> + </select> + </div> + <div class="form-row node-input-group"> + <label for="node-input-group"><i class="fa fa-list"></i> Group</label> + <input type="text" id="node-input-group" placeholder="225.0.18.83"> + </div> + <div class="form-row node-input-iface"> + <label for="node-input-iface"><i class="fa fa-random"></i> Interface</label> + <input type="text" id="node-input-iface" placeholder="(optional) ip address of eth0"> + </div> + <div class="form-row"> + <label for="node-input-datatype"><i class="fa fa-sign-out"></i> Output</label> + <select id="node-input-datatype" style="width: 70%;"> + <option value="buffer">a Buffer</option> + <option value="utf8">a String</option> + <option value="base64">a Base64 encoded string</option> + </select> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div class="form-tips">Tip: Make sure your firewall will allow the data in.</div> + <script> + $("#node-input-multicast").change(function() { + var id = $("#node-input-multicast option:selected").val(); + if (id == "false") { + $(".node-input-group").hide(); + $(".node-input-iface").hide(); + } + else { + $(".node-input-group").show(); + $(".node-input-iface").show(); + } + }); + </script> +</script> + +<script type="text/x-red" data-help-name="udp in"> + <p>A udp input node, that produces a <b>msg.payload</b> containing a <i>BUFFER</i>, string, or base64 encoded string. Supports multicast.</p> + <p>It also provides <b>msg.ip</b> and <b>msg.port</b> to the ip address and port from which the message was received.</p> + <p>On some systems you may need to be root to use ports below 1024 and/or broadcast.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('udp in',{ + category: 'input', + color:"Silver", + defaults: { + name: {value:""}, + iface: {value:""}, + port: {value:"",required:true,validate:RED.validators.number()}, + datatype: {value:"buffer",required:true}, + multicast: {value:"false"}, + group: {value:"",validate:function(v) { return (this.multicast !== "true")||v.length > 0;} } + }, + inputs:0, + outputs:1, + icon: "bridge-dash.png", + label: function() { + if (this.multicast=="false") { + return this.name||"udp "+this.port; + } + else return this.name||"udp "+(this.group+":"+this.port); + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + } + }); +</script> + + +<!-- The Output Node --> +<script type="text/x-red" data-template-name="udp out"> + <div class="form-row"> + <label for="node-input-port"><i class="fa fa-envelope"></i> Send a</label> + <select id="node-input-multicast" style='width:40%'> + <option value="false">udp message</option> + <option value="broad">broadcast message</option> + <option value="multi">multicast message</option> + </select> + to port <input type="text" id="node-input-port" placeholder="port" style="width: 70px"> + </div> + <div class="form-row node-input-addr"> + <label for="node-input-addr" id="node-input-addr-label"><i class="fa fa-list"></i> Address</label> + <input type="text" id="node-input-addr" placeholder="destination ip" style="width: 70%;"> + </div> + <div class="form-row node-input-iface"> + <label for="node-input-iface"><i class="fa fa-random"></i> Interface</label> + <input type="text" id="node-input-iface" placeholder="(optional) ip address of eth0"> + </div> + <div class="form-row"> + <label for="node-input-outport-type"> </label> + <select id="node-input-outport-type"> + <option id="node-input-outport-type-random" value="random">use random local port</option> + <option value="fixed">bind to local port</option> + </select> + <input type="text" id="node-input-outport" style="width: 70px;" placeholder="port"> + </div> + <div class="form-row"> + <label> </label> + <input type="checkbox" id="node-input-base64" style="display: inline-block; width: auto; vertical-align: top;"> + <label for="node-input-base64" style="width: 70%;">Decode Base64 encoded payload ?</label> + </div> + <div class="form-row"> + <label for="node-input-name"><i class="fa fa-tag"></i> Name</label> + <input type="text" id="node-input-name" placeholder="Name"> + </div> + <div class="form-tips">Tip: leave address and port blank if you want to set using <b>msg.ip</b> and <b>msg.port</b>.</div> + <script> + $("#node-input-multicast").change(function() { + var id = $("#node-input-multicast option:selected").val(); + if (id !== "multi") { + $(".node-input-iface").hide(); + $("#node-input-addr-label").html('<i class="fa fa-list"></i> Address'); + $("#node-input-addr")[0].placeholder = 'destination ip'; + } + else { + $(".node-input-iface").show(); + $("#node-input-addr-label").html('<i class="fa fa-list"></i> Group'); + $("#node-input-addr")[0].placeholder = '225.0.18.83'; + } + if (id === "broad") { + $("#node-input-addr")[0].placeholder = '255.255.255.255'; + } + }); + </script> +</script> + +<script type="text/x-red" data-help-name="udp out"> + <p>This node sends <b>msg.payload</b> to the designated udp host and port. Supports multicast.</p> + <p>You may also use <b>msg.ip</b> and <b>msg.port</b> to set the destination values.<br/><b>Note</b>: the statically configured values have precedence.</p> + <p>If you select broadcast either set the address to the local broadcast ip address, or maybe try 255.255.255.255, which is the global broadcast address.</p> + <p>On some systems you may need to be root to use ports below 1024 and/or broadcast.</p> +</script> + +<script type="text/javascript"> + RED.nodes.registerType('udp out',{ + category: 'output', + color:"Silver", + defaults: { + name: {value:""}, + addr: {value:""}, + iface: {value:""}, + port: {value:""}, + outport: {value:""}, + base64: {value:false,required:true}, + multicast: {value:"false"} + }, + inputs:1, + outputs:0, + icon: "bridge-dash.png", + align: "right", + label: function() { + return this.name||"udp "+(this.addr+":"+this.port); + }, + labelStyle: function() { + return this.name?"node_label_italic":""; + }, + oneditprepare: function() { + var type = this.outport==""?"random":"fixed"; + $("#node-input-outport-type option").filter(function() { + return $(this).val() == type; + }).attr('selected',true); + + $("#node-input-outport-type").change(function() { + var type = $(this).children("option:selected").val(); + if (type == "random") { + $("#node-input-outport").val("").hide(); + } else { + $("#node-input-outport").show(); + } + }); + + $("#node-input-outport-type").change(); + + $("#node-input-multicast").change(function() { + var type = $(this).children("option:selected").val(); + if (type == "false") { + $("#node-input-outport-type-random").html("bind to random local port"); + } else { + $("#node-input-outport-type-random").html("bind to target port"); + } + }); + $("#node-input-multicast").change(); + } + }); +</script> diff --git a/dgbuilder/core_nodes/io/32-udp.js b/dgbuilder/core_nodes/io/32-udp.js new file mode 100644 index 00000000..a7968e3a --- /dev/null +++ b/dgbuilder/core_nodes/io/32-udp.js @@ -0,0 +1,171 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +module.exports = function(RED) { + "use strict"; + var dgram = require('dgram'); + + // The Input Node + function UDPin(n) { + RED.nodes.createNode(this,n); + this.group = n.group; + this.port = n.port; + this.datatype = n.datatype; + this.iface = n.iface || null; + this.multicast = n.multicast; + var node = this; + + var server = dgram.createSocket('udp4'); + + server.on("error", function (err) { + if ((err.code == "EACCES") && (node.port < 1024)) { + node.error("UDP access error, you may need root access for ports below 1024"); + } else { + node.error("UDP error : "+err.code); + } + server.close(); + }); + + server.on('message', function (message, remote) { + var msg; + if (node.datatype =="base64") { + msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port }; + } else if (node.datatype =="utf8") { + msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port }; + } else { + msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port }; + } + node.send(msg); + }); + + server.on('listening', function () { + var address = server.address(); + node.log('udp listener at ' + address.address + ":" + address.port); + if (node.multicast == "true") { + server.setBroadcast(true); + try { + server.setMulticastTTL(128); + server.addMembership(node.group,node.iface); + node.log("udp multicast group "+node.group); + } catch (e) { + if (e.errno == "EINVAL") { + node.error("Bad Multicast Address"); + } else if (e.errno == "ENODEV") { + node.error("Must be ip address of the required interface"); + } else { + node.error("Error :"+e.errno); + } + } + } + }); + + node.on("close", function() { + try { + server.close(); + node.log('udp listener stopped'); + } catch (err) { + node.error(err); + } + }); + + server.bind(node.port,node.iface); + } + RED.nodes.registerType("udp in",UDPin); + + + // The Output Node + function UDPout(n) { + RED.nodes.createNode(this,n); + //this.group = n.group; + this.port = n.port; + this.outport = n.outport||""; + this.base64 = n.base64; + this.addr = n.addr; + this.iface = n.iface || null; + this.multicast = n.multicast; + var node = this; + + var sock = dgram.createSocket('udp4'); // only use ipv4 for now + + if (node.multicast != "false") { + if (node.outport == "") { node.outport = node.port; } + sock.bind(node.outport, function() { // have to bind before you can enable broadcast... + sock.setBroadcast(true); // turn on broadcast + if (node.multicast == "multi") { + try { + sock.setMulticastTTL(128); + sock.addMembership(node.addr,node.iface); // Add to the multicast group + node.log('udp multicast ready : '+node.outport+' -> '+node.addr+":"+node.port); + } catch (e) { + if (e.errno == "EINVAL") { + node.error("Bad Multicast Address"); + } else if (e.errno == "ENODEV") { + node.error("Must be ip address of the required interface"); + } else { + node.error("Error :"+e.errno); + } + } + } else { + node.log('udp broadcast ready : '+node.outport+' -> '+node.addr+":"+node.port); + } + }); + } else if (node.outport != "") { + sock.bind(node.outport); + node.log('udp ready : '+node.outport+' -> '+node.addr+":"+node.port); + } else { + node.log('udp ready : '+node.addr+":"+node.port); + } + + node.on("input", function(msg) { + if (msg.payload != null) { + var add = node.addr || msg.ip || ""; + var por = node.port || msg.port || 0; + if (add == "") { + node.warn("udp: ip address not set"); + } else if (por == 0) { + node.warn("udp: port not set"); + } else if (isNaN(por) || (por < 1) || (por > 65535)) { + node.warn("udp: port number not valid"); + } else { + var message; + if (node.base64) { + message = new Buffer(msg.payload, 'base64'); + } else if (msg.payload instanceof Buffer) { + message = msg.payload; + } else { + message = new Buffer(""+msg.payload); + } + sock.send(message, 0, message.length, por, add, function(err, bytes) { + if (err) { + node.error("udp : "+err); + } + message = null; + }); + } + } + }); + + node.on("close", function() { + try { + sock.close(); + node.log('udp output stopped'); + } catch (err) { + node.error(err); + } + }); + } + RED.nodes.registerType("udp out",UDPout); +} diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js new file mode 100644 index 00000000..141a8889 --- /dev/null +++ b/dgbuilder/core_nodes/io/lib/mqtt.js @@ -0,0 +1,254 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +var util = require("util"); +var mqtt = require("mqtt"); +var events = require("events"); +//var inspect = require("sys").inspect; + +//var Client = module.exports.Client = function( + +var port = 1883; +var host = "localhost"; + +function MQTTClient(port,host) { + this.port = port||1883; + this.host = host||"localhost"; + this.messageId = 1; + this.pendingSubscriptions = {}; + this.inboundMessages = {}; + this.lastOutbound = (new Date()).getTime(); + this.lastInbound = (new Date()).getTime(); + this.connected = false; + + this._nextMessageId = function() { + this.messageId += 1; + if (this.messageId > 0xFFFF) { + this.messageId = 1; + } + return this.messageId; + } + events.EventEmitter.call(this); +} +util.inherits(MQTTClient, events.EventEmitter); + +MQTTClient.prototype.connect = function(options) { + if (!this.connected) { + var self = this; + options = options||{}; + self.options = options; + self.options.keepalive = options.keepalive||15; + self.options.clean = self.options.clean||true; + self.options.protocolId = 'MQIsdp'; + self.options.protocolVersion = 3; + + self.client = mqtt.createConnection(this.port,this.host,function(err,client) { + if (err) { + self.connected = false; + clearInterval(self.watchdog); + self.connectionError = true; + //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); + self.emit('connectionlost',err); + return; + } + client.on('close',function(e) { + //util.log('[mqtt] ['+self.uid+'] on close'); + clearInterval(self.watchdog); + if (!self.connectionError) { + if (self.connected) { + self.connected = false; + self.emit('connectionlost',e); + } else { + self.emit('disconnect'); + } + } + }); + client.on('error',function(e) { + //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); + clearInterval(self.watchdog); + if (self.connected) { + self.connected = false; + self.emit('connectionlost',e); + } + }); + client.on('connack',function(packet) { + if (packet.returnCode == 0) { + self.watchdog = setInterval(function(self) { + var now = (new Date()).getTime(); + + //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); + + if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { + if (self.pingOutstanding) { + //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); + try { + self.client.disconnect(); + } catch (err) { + } + } else { + //util.log('[mqtt] ['+self.uid+'] watchdog pinging'); + self.lastOutbound = (new Date()).getTime(); + self.lastInbound = (new Date()).getTime(); + self.pingOutstanding = true; + self.client.pingreq(); + } + } + + },self.options.keepalive*500,self); + self.pingOutstanding = false; + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.connected = true; + self.connectionError = false; + self.emit('connect'); + } else { + self.connected = false; + self.emit('connectionlost'); + } + }); + client.on('suback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('subscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('unsuback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('unsubscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('publish',function(packet) { + self.lastInbound = (new Date()).getTime() + if (packet.qos < 2) { + var p = packet; + self.emit('message',p.topic,p.payload,p.qos,p.retain); + } else { + self.inboundMessages[packet.messageId] = packet; + this.lastOutbound = (new Date()).getTime() + self.client.pubrec(packet); + } + if (packet.qos == 1) { + this.lastOutbound = (new Date()).getTime() + self.client.puback(packet); + } + }); + + client.on('pubrel',function(packet) { + self.lastInbound = (new Date()).getTime() + var p = self.inboundMessages[packet.messageId]; + if (p) { + self.emit('message',p.topic,p.payload,p.qos,p.retain); + delete self.inboundMessages[packet.messageId]; + } + self.lastOutbound = (new Date()).getTime() + self.client.pubcomp(packet); + }); + + client.on('puback',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-1 complete + }); + + client.on('pubrec',function(packet) { + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.client.pubrel(packet); + }); + client.on('pubcomp',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-2 complete + }); + client.on('pingresp',function(packet) { + //util.log('[mqtt] ['+self.uid+'] received pingresp'); + self.lastInbound = (new Date()).getTime() + self.pingOutstanding = false; + }); + + this.lastOutbound = (new Date()).getTime() + this.connectionError = false; + client.connect(self.options); + }); + } +} + +MQTTClient.prototype.subscribe = function(topic,qos) { + var self = this; + if (self.connected) { + var options = { + subscriptions:[{topic:topic,qos:qos}], + messageId: self._nextMessageId() + }; + this.pendingSubscriptions[options.messageId] = topic; + this.lastOutbound = (new Date()).getTime() + self.client.subscribe(options); + } +} +MQTTClient.prototype.unsubscribe = function(topic) { + var self = this; + if (self.connected) { + var options = { + topic:topic, + messageId: self._nextMessageId() + }; + this.pendingSubscriptions[options.messageId] = topic; + this.lastOutbound = (new Date()).getTime() + self.client.unsubscribe(options); + } +} + +MQTTClient.prototype.publish = function(topic,payload,qos,retain) { + var self = this; + if (self.connected) { + + if (!Buffer.isBuffer(payload)) { + if (typeof payload === "object") { + payload = JSON.stringify(payload); + } else if (typeof payload !== "string") { + payload = ""+payload; + } + } + var options = { + topic: topic, + payload: payload, + qos: qos||0, + retain:retain||false + }; + if (options.qos != 0) { + options.messageId = self._nextMessageId(); + } + this.lastOutbound = (new Date()).getTime() + self.client.publish(options); + } +} + +MQTTClient.prototype.disconnect = function() { + var self = this; + if (this.connected) { + this.connected = false; + try { + this.client.disconnect(); + } catch(err) { + } + } +} +MQTTClient.prototype.isConnected = function() { + return this.connected; +} +module.exports.createClient = function(port,host) { + var mqtt_client = new MQTTClient(port,host); + return mqtt_client; +} + diff --git a/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js new file mode 100644 index 00000000..d15f0fc7 --- /dev/null +++ b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js @@ -0,0 +1,128 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +var util = require("util"); +var mqtt = require("./mqtt"); +var settings = require(process.env.NODE_RED_HOME+"/red/red").settings; + +var connections = {}; + +function matchTopic(ts,t) { + if (ts == "#") { + return true; + } + var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$"); + return re.test(t); +} + +module.exports = { + get: function(broker,port,clientid,username,password,will) { + var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port; + if (!connections[id]) { + connections[id] = function() { + var uid = (1+Math.random()*4294967295).toString(16); + var client = mqtt.createClient(port,broker); + client.uid = uid; + client.setMaxListeners(0); + var options = {keepalive:15}; + options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16); + options.username = username; + options.password = password; + options.will = will; + var queue = []; + var subscriptions = []; + var connecting = false; + var obj = { + _instances: 0, + publish: function(msg) { + if (client.isConnected()) { + client.publish(msg.topic,msg.payload,msg.qos,msg.retain); + } else { + if (!connecting) { + connecting = true; + client.connect(options); + } + queue.push(msg); + } + }, + subscribe: function(topic,qos,callback) { + subscriptions.push({topic:topic,qos:qos,callback:callback}); + client.on('message',function(mtopic,mpayload,mqos,mretain) { + if (matchTopic(topic,mtopic)) { + callback(mtopic,mpayload,mqos,mretain); + } + }); + if (client.isConnected()) { + client.subscribe(topic,qos); + } + }, + on: function(a,b){ + client.on(a,b); + }, + once: function(a,b){ + client.once(a,b); + }, + connect: function() { + if (client && !client.isConnected() && !connecting) { + connecting = true; + client.connect(options); + } + }, + disconnect: function() { + this._instances -= 1; + if (this._instances == 0) { + client.disconnect(); + client = null; + delete connections[id]; + } + } + }; + client.on('connect',function() { + if (client) { + util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port); + connecting = false; + for (var s in subscriptions) { + var topic = subscriptions[s].topic; + var qos = subscriptions[s].qos; + var callback = subscriptions[s].callback; + client.subscribe(topic,qos); + } + //console.log("connected - publishing",queue.length,"messages"); + while(queue.length) { + var msg = queue.shift(); + //console.log(msg); + client.publish(msg.topic,msg.payload,msg.qos,msg.retain); + } + } + }); + client.on('connectionlost', function(err) { + util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port); + connecting = false; + setTimeout(function() { + obj.connect(); + }, settings.mqttReconnectTime||5000); + }); + client.on('disconnect', function() { + connecting = false; + util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port); + }); + + return obj + }(); + } + connections[id]._instances += 1; + return connections[id]; + } +}; |