summaryrefslogtreecommitdiffstats
path: root/dgbuilder/core_nodes/io
diff options
context:
space:
mode:
authorChinthakayala, Sheshashailavas (sc2914) <sc2914@us.att.com>2017-08-28 05:25:46 -0900
committerChinthakayala, Sheshashailavas (sc2914) <sc2914@att.com>2017-08-28 05:36:52 -0900
commitd1569975bb18f4359fac18aa98f55b69c248a3ad (patch)
treec8681eeac12dca8673ccf841705daac88bf01ca6 /dgbuilder/core_nodes/io
parenta016ea661ff5767a3539734c4c07ef974a6e4614 (diff)
[CCSDK-28] populated the seed code for dgbuilder
updated the code to point to the new package name for sli Change-Id: I3b5a1d05dc5193664fd4a667afdcd0b2354010a4 Issue-ID:{CCSDK-28} Signed-off-by: Chinthakayala, Sheshashailavas (sc2914) <sc2914@att.com> Signed-off-by: Chinthakayala, Sheshashailavas (sc2914) <sc2914@att.com>
Diffstat (limited to 'dgbuilder/core_nodes/io')
-rw-r--r--dgbuilder/core_nodes/io/10-mqtt.html157
-rw-r--r--dgbuilder/core_nodes/io/10-mqtt.js119
-rw-r--r--dgbuilder/core_nodes/io/21-httpin.html254
-rw-r--r--dgbuilder/core_nodes/io/21-httpin.js241
-rw-r--r--dgbuilder/core_nodes/io/22-websocket.html163
-rw-r--r--dgbuilder/core_nodes/io/22-websocket.js185
-rw-r--r--dgbuilder/core_nodes/io/23-watch.html57
-rw-r--r--dgbuilder/core_nodes/io/23-watch.js51
-rw-r--r--dgbuilder/core_nodes/io/25-serial.html265
-rw-r--r--dgbuilder/core_nodes/io/25-serial.js310
-rw-r--r--dgbuilder/core_nodes/io/31-tcpin.html299
-rw-r--r--dgbuilder/core_nodes/io/31-tcpin.js472
-rw-r--r--dgbuilder/core_nodes/io/32-udp.html212
-rw-r--r--dgbuilder/core_nodes/io/32-udp.js171
-rw-r--r--dgbuilder/core_nodes/io/lib/mqtt.js254
-rw-r--r--dgbuilder/core_nodes/io/lib/mqttConnectionPool.js128
16 files changed, 3338 insertions, 0 deletions
diff --git a/dgbuilder/core_nodes/io/10-mqtt.html b/dgbuilder/core_nodes/io/10-mqtt.html
new file mode 100644
index 00000000..2ff5eb29
--- /dev/null
+++ b/dgbuilder/core_nodes/io/10-mqtt.html
@@ -0,0 +1,157 @@
+<!--
+ Copyright 2013,2014 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/x-red" data-template-name="mqtt in">
+ <div class="form-row">
+ <label for="node-input-broker"><i class="fa fa-globe"></i> Broker</label>
+ <input type="text" id="node-input-broker">
+ </div>
+ <div class="form-row">
+ <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label>
+ <input type="text" id="node-input-topic" placeholder="Topic">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="mqtt in">
+ <p>MQTT input node. Connects to a broker and subscribes to the specified topic. The topic may contain MQTT wildcards.</p>
+ <p>Outputs an object called <b>msg</b> containing <b>msg.topic, msg.payload, msg.qos</b> and <b>msg.retain</b>.</p>
+ <p><b>msg.payload</b> is a String.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('mqtt in',{
+ category: 'input',
+ defaults: {
+ name: {value:""},
+ topic: {value:"",required:true},
+ broker: {type:"mqtt-broker", required:true}
+ },
+ color:"#d8bfd8",
+ inputs:0,
+ outputs:1,
+ icon: "bridge.png",
+ label: function() {
+ return this.name||this.topic||"mqtt";
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<script type="text/x-red" data-template-name="mqtt out">
+ <div class="form-row">
+ <label for="node-input-broker"><i class="fa fa-globe"></i> Broker</label>
+ <input type="text" id="node-input-broker">
+ </div>
+ <div class="form-row">
+ <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label>
+ <input type="text" id="node-input-topic" placeholder="Topic">
+ </div>
+ <div class="form-row">
+ <label for="node-input-qos"><i class="fa fa-empire"></i> QoS</label>
+ <select id="node-input-qos" style="width:125px !important">
+ <option value=""></option>
+ <option value="0">0</option>
+ <option value="1">1</option>
+ <option value="2">2</option>
+ </select>
+ &nbsp;&nbsp;<i class="fa fa-history"></i>&nbsp;Retain &nbsp;<select id="node-input-retain" style="width:125px !important">
+ <option value=""></option>
+ <option value="false">false</option>
+ <option value="true">true</option>
+ </select>
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div class="form-tips">Tip: Leave topic, qos or retain blank if you want to set them via msg properties.</div>
+</script>
+
+<script type="text/x-red" data-help-name="mqtt out">
+ <p>Connects to a MQTT broker and publishes <b>msg.payload</b> either to the <b>msg.topic</b> or to the topic specified in the edit window. The value in the edit window has precedence.</p>
+ <p>Likewise QoS and/or retain values in the edit panel will overwrite any <b>msg.qos</b> and <b>msg.retain</b> properties. If nothing is set they default to <i>0</i> and <i>false</i> respectively.</p>
+ <p>If <b>msg.payload</b> contains an object it will be stringified before being sent.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('mqtt out',{
+ category: 'output',
+ defaults: {
+ name: {value:""},
+ topic: {value:""},
+ qos: {value:""},
+ retain: {value:""},
+ broker: {type:"mqtt-broker", required:true}
+ },
+ color:"#d8bfd8",
+ inputs:1,
+ outputs:0,
+ icon: "bridge.png",
+ align: "right",
+ label: function() {
+ return this.name||this.topic||"mqtt";
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<script type="text/x-red" data-template-name="mqtt-broker">
+ <div class="form-row node-input-broker">
+ <label for="node-config-input-broker"><i class="fa fa-globe"></i> Broker</label>
+ <input class="input-append-left" type="text" id="node-config-input-broker" placeholder="localhost" style="width: 40%;" >
+ <label for="node-config-input-port" style="margin-left: 10px; width: 35px; "> Port</label>
+ <input type="text" id="node-config-input-port" placeholder="Port" style="width:45px">
+ </div>
+ <div class="form-row">
+ <label for="node-config-input-clientid"><i class="fa fa-tag"></i> Client ID</label>
+ <input type="text" id="node-config-input-clientid" placeholder="Leave blank for auto generated">
+ </div>
+ <div class="form-row">
+ <label for="node-config-input-user"><i class="fa fa-user"></i> Username</label>
+ <input type="text" id="node-config-input-user">
+ </div>
+ <div class="form-row">
+ <label for="node-config-input-password"><i class="fa fa-lock"></i> Password</label>
+ <input type="password" id="node-config-input-password">
+ </div>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('mqtt-broker',{
+ category: 'config',
+ defaults: {
+ broker: {value:"",required:true},
+ port: {value:1883,required:true,validate:RED.validators.number()},
+ clientid: { value:"" }
+ },
+ credentials: {
+ user: {type:"text"},
+ password: {type: "password"}
+ },
+ label: function() {
+ if (this.broker == "") { this.broker = "localhost"; }
+ return (this.clientid?this.clientid+"@":"")+this.broker+":"+this.port;
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/10-mqtt.js b/dgbuilder/core_nodes/io/10-mqtt.js
new file mode 100644
index 00000000..c8bc4901
--- /dev/null
+++ b/dgbuilder/core_nodes/io/10-mqtt.js
@@ -0,0 +1,119 @@
+/**
+ * Copyright 2013,2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var connectionPool = require("./lib/mqttConnectionPool");
+
+ function MQTTBrokerNode(n) {
+ RED.nodes.createNode(this,n);
+ this.broker = n.broker;
+ this.port = n.port;
+ this.clientid = n.clientid;
+ if (this.credentials) {
+ this.username = this.credentials.user;
+ this.password = this.credentials.password;
+ }
+ }
+ RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
+ credentials: {
+ user: {type:"text"},
+ password: {type: "password"}
+ }
+ });
+
+ function MQTTInNode(n) {
+ RED.nodes.createNode(this,n);
+ this.topic = n.topic;
+ this.broker = n.broker;
+ this.brokerConfig = RED.nodes.getNode(this.broker);
+ if (this.brokerConfig) {
+ this.status({fill:"red",shape:"ring",text:"disconnected"});
+ this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
+ var node = this;
+ this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) {
+ var msg = {topic:topic,payload:payload,qos:qos,retain:retain};
+ if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) {
+ msg._topic = topic;
+ }
+ node.send(msg);
+ });
+ this.client.on("connectionlost",function() {
+ node.status({fill:"red",shape:"ring",text:"disconnected"});
+ });
+ this.client.on("connect",function() {
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ this.client.connect();
+ } else {
+ this.error("missing broker configuration");
+ }
+ this.on('close', function() {
+ if (this.client) {
+ this.client.disconnect();
+ }
+ });
+ }
+ RED.nodes.registerType("mqtt in",MQTTInNode);
+
+ function MQTTOutNode(n) {
+ RED.nodes.createNode(this,n);
+ this.topic = n.topic;
+ this.qos = n.qos || null;
+ this.retain = n.retain;
+ this.broker = n.broker;
+ this.brokerConfig = RED.nodes.getNode(this.broker);
+
+ if (this.brokerConfig) {
+ this.status({fill:"red",shape:"ring",text:"disconnected"},true);
+ this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
+ var node = this;
+ this.on("input",function(msg) {
+ if (msg.qos) {
+ msg.qos = parseInt(msg.qos);
+ if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) {
+ msg.qos = null;
+ }
+ }
+ msg.qos = Number(node.qos || msg.qos || 0);
+ msg.retain = node.retain || msg.retain || false;
+ msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
+ if (node.topic) {
+ msg.topic = node.topic;
+ }
+ if ((msg.hasOwnProperty("topic")) && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
+ this.client.publish(msg); // send the message
+ }
+ else { node.warn("Invalid topic specified"); }
+ });
+ this.client.on("connectionlost",function() {
+ node.status({fill:"red",shape:"ring",text:"disconnected"});
+ });
+ this.client.on("connect",function() {
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ this.client.connect();
+ } else {
+ this.error("missing broker configuration");
+ }
+ this.on('close', function() {
+ if (this.client) {
+ this.client.disconnect();
+ }
+ });
+ }
+ RED.nodes.registerType("mqtt out",MQTTOutNode);
+}
diff --git a/dgbuilder/core_nodes/io/21-httpin.html b/dgbuilder/core_nodes/io/21-httpin.html
new file mode 100644
index 00000000..059b8596
--- /dev/null
+++ b/dgbuilder/core_nodes/io/21-httpin.html
@@ -0,0 +1,254 @@
+<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/x-red" data-template-name="http in">
+ <div class="form-row">
+ <label for="node-input-method"><i class="fa fa-tasks"></i> Method</label>
+ <select type="text" id="node-input-method" style="width:72%;">
+ <option value="get">GET</option>
+ <option value="post">POST</option>
+ <option value="put">PUT</option>
+ <option value="delete">DELETE</option>
+ </select>
+ </div>
+ <div class="form-row">
+ <label for="node-input-url"><i class="fa fa-globe"></i> url</label>
+ <input type="text" id="node-input-url" placeholder="/url">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div id="node-input-tip" class="form-tips">The url will be relative to <code><span id="node-input-path"></span></code>.</div>
+</script>
+
+<script type="text/x-red" data-help-name="http in">
+ <p>Provides an input node for http requests, allowing the creation of simple web services.</p>
+ <p>The resulting message has the following properties:
+ <ul>
+ <li>msg.req : <a href="http://expressjs.com/api.html#req">http request</a></li>
+ <li>msg.res : <a href="http://expressjs.com/api.html#res">http response</a></li>
+ </ul>
+ </p>
+ <p>For POST/PUT requests, the body is available under <code>msg.req.body</code>. This
+ uses the <a href="http://expressjs.com/api.html#bodyParser">Express bodyParser middleware</a> to parse the content to a JSON object.
+ </p>
+ <p>
+ By default, this expects the body of the request to be url encoded:
+ <pre>foo=bar&amp;this=that</pre>
+ </p>
+ <p>
+ To send JSON encoded data to the node, the content-type header of the request must be set to
+ <code>application/json</code>.
+ </p>
+ <p>
+ <b>Note: </b>This node does not send any response to the http request. This should be done with
+ a subsequent HTTP Response node, or Function node.
+ In the case of a Function node, the <a href="http://expressjs.com/api.html#res">Express response documentation</a>
+ describes how this should be done. For example:
+ <pre>msg.res.send(200, 'Thanks for the request ');<br/>return msg;</pre>
+ </p>
+
+</script>
+
+<script type="text/x-red" data-template-name="http response">
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div class="form-tips">The messages sent to this node <b>must</b> originate from an <i>http input</i> node</div>
+</script>
+
+<script type="text/x-red" data-help-name="http response">
+ <p>Sends responses back to http requests received from an HTTP Input node.</p>
+ <p>The response can be customised using the following message properties:</p>
+ <ul>
+ <li><code>payload</code> is sent as the body of the response</li>
+ <li><code>statusCode</code>, if set, is used as the response status code (default: 200)</li>
+ <li><code>headers</code>, if set, should be an object containing field/value
+ pairs to be added as response headers.</li>
+ </ul>
+</script>
+
+<script type="text/x-red" data-template-name="http request">
+ <div class="form-row">
+ <label for="node-input-method"><i class="fa fa-tasks"></i> Method</label>
+ <select type="text" id="node-input-method" style="width:72%;">
+ <option value="GET">GET</option>
+ <option value="POST">POST</option>
+ <option value="PUT">PUT</option>
+ <option value="DELETE">DELETE</option>
+ </select>
+ </div>
+ <div class="form-row">
+ <label for="node-input-url"><i class="fa fa-globe"></i> URL</label>
+ <input type="text" id="node-input-url" placeholder="http://">
+ </div>
+ <div class="form-row">
+ <label>&nbsp;</label>
+ <input type="checkbox" id="node-input-useAuth" style="display: inline-block; width: auto; vertical-align: top;">
+ <label for="node-input-useAuth" style="width: 70%;">Use basic authentication?</label>
+ </div>
+ <div class="form-row node-input-useAuth-row">
+ <label for="node-input-user"><i class="fa fa-user"></i> Username</label>
+ <input type="text" id="node-input-user">
+ </div>
+ <div class="form-row node-input-useAuth-row">
+ <label for="node-input-password"><i class="fa fa-lock"></i> Password</label>
+ <input type="password" id="node-input-password">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="http request">
+ <p>Provides a node for making http requests.</p>
+ <p>The URL and HTTP method can be configured in the node, but also
+ overridden by the incoming message:
+ <ul>
+ <li><code>url</code>, if set, is used as the url of the request. Must start with http: or https:</li>
+ <li><code>method</code>, if set, is used as the HTTP method of the request.
+ Must be one of <code>GET</code>, <code>PUT</code>, <code>POST</code> or <code>DELETE</code> (default: GET)</li>
+ <li><code>headers</code>, if set, should be an object containing field/value
+ pairs to be added as request headers</li>
+ <li><code>payload</code> is sent as the body of the request</li>
+ </ul>
+ <p>When configured within the node, the URL property can contain <a href="http://mustache.github.io/mustache.5.html" target="_new">mustache-style</a> tags. These allow the
+ url to be constructed using values of the incoming message. For example, if the url is set to
+ <code>example.com/{{topic}}</code>, it will have the value of <code>msg.topic</code> automatically inserted.</p>
+ <p>
+ The output message contains the following properties:
+ <ul>
+ <li><code>payload</code> is the body of the response</li>
+ <li><code>statusCode</code> is the status code of the response, or the error code if the request could not be completed</li>
+ <li><code>headers</code> is an object containing the response headers</li>
+ </ul>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('http in',{
+ category: 'input',
+ color:"rgb(231, 231, 174)",
+ defaults: {
+ name: {value:""},
+ url: {value:"",required:true},
+ method: {value:"get",required:true}
+ },
+ inputs:0,
+ outputs:1,
+ icon: "white-globe.png",
+ label: function() {
+ if (this.name) {
+ return this.name;
+ } else if (this.url) {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) != "/") {
+ root = root+"/";
+ }
+ if (this.url.charAt(0) == "/") {
+ root += this.url.slice(1);
+ } else {
+ root += this.url;
+ }
+ return "["+this.method+"] "+root;
+ } else {
+ return "http";
+ }
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ },
+ oneditprepare: function() {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) == "/") {
+ root = root.slice(0,-1);
+ }
+ if (root == "") {
+ $("#node-input-tip").hide();
+ } else {
+ $("#node-input-path").html(root);
+ $("#node-input-tip").show();
+ }
+ //document.getElementById("node-config-wsdocpath").innerHTML=
+ }
+
+ });
+
+ RED.nodes.registerType('http response',{
+ category: 'output',
+ color:"rgb(231, 231, 174)",
+ defaults: {
+ name: {value:""}
+ },
+ inputs:1,
+ outputs:0,
+ align: "right",
+ icon: "white-globe.png",
+ label: function() {
+ return this.name||"http";
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+
+ RED.nodes.registerType('http request',{
+ category: 'function',
+ color:"rgb(231, 231, 174)",
+ defaults: {
+ name: {value:""},
+ method:{value:"GET"},
+ url:{value:""},
+ //user -> credentials
+ //pass -> credentials
+ },
+ credentials: {
+ user: {type:"text"},
+ password: {type: "password"}
+ },
+ inputs:1,
+ outputs:1,
+ align: "right",
+ icon: "white-globe.png",
+ label: function() {
+ return this.name||"http request";
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ },
+ oneditprepare: function() {
+ if (this.credentials.user || this.credentials.has_password) {
+ $('#node-input-useAuth').prop('checked', true);
+ $(".node-input-useAuth-row").show();
+ } else {
+ $('#node-input-useAuth').prop('checked', false);
+ $(".node-input-useAuth-row").hide();
+ }
+
+ $("#node-input-useAuth").change(function() {
+ if ($(this).is(":checked")) {
+ $(".node-input-useAuth-row").show();
+ } else {
+ $(".node-input-useAuth-row").hide();
+ $('#node-input-user').val('');
+ $('#node-input-password').val('');
+ }
+ });
+ },
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/21-httpin.js b/dgbuilder/core_nodes/io/21-httpin.js
new file mode 100644
index 00000000..877ccc09
--- /dev/null
+++ b/dgbuilder/core_nodes/io/21-httpin.js
@@ -0,0 +1,241 @@
+/**
+ * Copyright 2013,2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var http = require("follow-redirects").http;
+ var https = require("follow-redirects").https;
+ var urllib = require("url");
+ var express = require("express");
+ var getBody = require('raw-body');
+ var mustache = require("mustache");
+ var querystring = require("querystring");
+
+ var cors = require('cors');
+ var jsonParser = express.json();
+ var urlencParser = express.urlencoded();
+
+ function rawBodyParser(req, res, next) {
+ if (req._body) { return next(); }
+ req.body = "";
+ req._body = true;
+ getBody(req, {
+ limit: '1mb',
+ length: req.headers['content-length'],
+ encoding: 'utf8'
+ }, function (err, buf) {
+ if (err) { return next(err); }
+ req.body = buf;
+ next();
+ });
+ }
+
+
+ function HTTPIn(n) {
+ RED.nodes.createNode(this,n);
+ if (RED.settings.httpNodeRoot !== false) {
+
+ this.url = n.url;
+ this.method = n.method;
+
+ var node = this;
+
+ this.errorHandler = function(err,req,res,next) {
+ node.warn(err);
+ res.send(500);
+ };
+
+ this.callback = function(req,res) {
+ if (node.method == "post") {
+ node.send({req:req,res:res,payload:req.body});
+ } else if (node.method == "get") {
+ node.send({req:req,res:res,payload:req.query});
+ } else {
+ node.send({req:req,res:res});
+ }
+ }
+
+ var corsHandler = function(req,res,next) { next(); }
+
+ if (RED.settings.httpNodeCors) {
+ corsHandler = cors(RED.settings.httpNodeCors);
+ RED.httpNode.options(this.url,corsHandler);
+ }
+
+ if (this.method == "get") {
+ RED.httpNode.get(this.url,corsHandler,this.callback,this.errorHandler);
+ } else if (this.method == "post") {
+ RED.httpNode.post(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
+ } else if (this.method == "put") {
+ RED.httpNode.put(this.url,corsHandler,jsonParser,urlencParser,rawBodyParser,this.callback,this.errorHandler);
+ } else if (this.method == "delete") {
+ RED.httpNode.delete(this.url,corsHandler,this.callback,this.errorHandler);
+ }
+
+ this.on("close",function() {
+ var routes = RED.httpNode.routes[this.method];
+ for (var i = 0; i<routes.length; i++) {
+ if (routes[i].path == this.url) {
+ routes.splice(i,1);
+ //break;
+ }
+ }
+ if (RED.settings.httpNodeCors) {
+ var route = RED.httpNode.route['options'];
+ for (var j = 0; j<route.length; j++) {
+ if (route[j].path == this.url) {
+ route.splice(j,1);
+ //break;
+ }
+ }
+ }
+ });
+ } else {
+ this.warn("Cannot create http-in node when httpNodeRoot set to false");
+ }
+ }
+ RED.nodes.registerType("http in",HTTPIn);
+
+
+ function HTTPOut(n) {
+ RED.nodes.createNode(this,n);
+ var node = this;
+ this.on("input",function(msg) {
+ if (msg.res) {
+ if (msg.headers) {
+ msg.res.set(msg.headers);
+ }
+ var statusCode = msg.statusCode || 200;
+ if (typeof msg.payload == "object" && !Buffer.isBuffer(msg.payload)) {
+ msg.res.jsonp(statusCode,msg.payload);
+ } else {
+ if (msg.res.get('content-length') == null) {
+ var len;
+ if (msg.payload == null) {
+ len = 0;
+ } else if (typeof msg.payload == "number") {
+ len = Buffer.byteLength(""+msg.payload);
+ } else {
+ len = Buffer.byteLength(msg.payload);
+ }
+ msg.res.set('content-length', len);
+ }
+ msg.res.send(statusCode,msg.payload);
+ }
+ } else {
+ node.warn("No response object");
+ }
+ });
+ }
+ RED.nodes.registerType("http response",HTTPOut);
+
+ function HTTPRequest(n) {
+ RED.nodes.createNode(this,n);
+ var nodeUrl = n.url;
+ var isTemplatedUrl = (nodeUrl||"").indexOf("{{") != -1;
+ var nodeMethod = n.method || "GET";
+ var node = this;
+ this.on("input",function(msg) {
+ node.status({fill:"blue",shape:"dot",text:"requesting"});
+ var url;
+ if (msg.url) {
+ url = msg.url;
+ } else if (isTemplatedUrl) {
+ url = mustache.render(nodeUrl,msg);
+ } else {
+ url = nodeUrl;
+ }
+ // url must start http:// or https:// so assume http:// if not set
+ if (!((url.indexOf("http://")===0) || (url.indexOf("https://")===0))) {
+ url = "http://"+url;
+ }
+
+ var method = (msg.method||nodeMethod).toUpperCase();
+ //node.log(method+" : "+url);
+ var opts = urllib.parse(url);
+ opts.method = method;
+ opts.headers = {};
+ if (msg.headers) {
+ for (var v in msg.headers) {
+ if (msg.headers.hasOwnProperty(v)) {
+ var name = v.toLowerCase();
+ if (name !== "content-type" && name !== "content-length") {
+ // only normalise the known headers used later in this
+ // function. Otherwise leave them alone.
+ name = v;
+ }
+ opts.headers[name] = msg.headers[v];
+ }
+ }
+ }
+ if (this.credentials && this.credentials.user) {
+ opts.auth = this.credentials.user+":"+(this.credentials.password||"");
+ }
+ var payload = null;
+
+ if (msg.payload && (method == "POST" || method == "PUT") ) {
+ if (typeof msg.payload === "string" || Buffer.isBuffer(msg.payload)) {
+ payload = msg.payload;
+ } else if (typeof msg.payload == "number") {
+ payload = msg.payload+"";
+ } else {
+ if (opts.headers['content-type'] == 'application/x-www-form-urlencoded') {
+ payload = querystring.stringify(msg.payload);
+ } else {
+ payload = JSON.stringify(msg.payload);
+ if (opts.headers['content-type'] == null) {
+ opts.headers['content-type'] = "application/json";
+ }
+ }
+ }
+ if (opts.headers['content-length'] == null) {
+ opts.headers['content-length'] = Buffer.byteLength(payload);
+ }
+ }
+
+ var req = ((/^https/.test(url))?https:http).request(opts,function(res) {
+ res.setEncoding('utf8');
+ msg.statusCode = res.statusCode;
+ msg.headers = res.headers;
+ msg.payload = "";
+ res.on('data',function(chunk) {
+ msg.payload += chunk;
+ });
+ res.on('end',function() {
+ node.send(msg);
+ node.status({});
+ });
+ });
+ req.on('error',function(err) {
+ msg.payload = err.toString() + " : " + url;
+ msg.statusCode = err.code;
+ node.send(msg);
+ node.status({fill:"red",shape:"ring",text:err.code});
+ });
+ if (payload) {
+ req.write(payload);
+ }
+ req.end();
+ });
+ }
+
+ RED.nodes.registerType("http request",HTTPRequest,{
+ credentials: {
+ user: {type:"text"},
+ password: {type: "password"}
+ }
+ });
+}
diff --git a/dgbuilder/core_nodes/io/22-websocket.html b/dgbuilder/core_nodes/io/22-websocket.html
new file mode 100644
index 00000000..ff6ed742
--- /dev/null
+++ b/dgbuilder/core_nodes/io/22-websocket.html
@@ -0,0 +1,163 @@
+<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- WebSocket Input Node -->
+<script type="text/x-red" data-template-name="websocket in">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-input-server">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket in">
+ <p>WebSocket input node.</p>
+ <p>By default, the data received from the WebSocket will be in <b>msg.payload</b>.
+ The listener can be configured to expect a properly formed JSON string, in which
+ case it will parse the JSON and send on the resulting object as the entire message.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket in',{
+ category: 'input',
+ defaults: {
+ name: {value:""},
+ server: {type:"websocket-listener"}
+ },
+ color:"rgb(215, 215, 160)",
+ inputs:0,
+ outputs:1,
+ icon: "white-globe.png",
+ label: function() {
+ var wsNode = RED.nodes.node(this.server);
+ return this.name||(wsNode?"[ws] "+wsNode.label():"websocket");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<!-- WebSocket out Node -->
+<script type="text/x-red" data-template-name="websocket out">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-input-server">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket out">
+ <p>WebSocket out node.</p>
+ <p>By default, <b>msg.payload</b> will be sent over the WebSocket. The listener
+ can be configured to encode the entire message object as a JSON string and send that
+ over the WebSocket.</p>
+
+ <p>If the message arriving at this node started at a WebSocket In node, the message
+ will be sent back to the client that triggered the flow. Otherwise, the message
+ will be broadcast to all connected clients.</p>
+ <p>If you want to broadcast a message that started at a WebSocket In node, you
+ should delete the <b>msg._session</b> property within the flow</p>.
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket out',{
+ category: 'output',
+ defaults: {
+ name: {value:""},
+ server: {type:"websocket-listener", required:true}
+ },
+ color:"rgb(215, 215, 160)",
+ inputs:1,
+ outputs:0,
+ icon: "white-globe.png",
+ align: "right",
+ label: function() {
+ var wsNode = RED.nodes.node(this.server);
+ return this.name||(wsNode?"[ws] "+wsNode.label():"websocket");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<!-- WebSocket Server configuration node -->
+<script type="text/x-red" data-template-name="websocket-listener">
+ <div class="form-row">
+ <label for="node-config-input-path"><i class="fa fa-bookmark"></i> Path</label>
+ <input type="text" id="node-config-input-path" placeholder="/ws/example">
+ </div>
+ <div class="form-row">
+ <label for="node-config-input-wholemsg">&nbsp;</label>
+ <select type="text" id="node-config-input-wholemsg" style="width: 70%;">
+ <option value="false">Send/Receive payload</option>
+ <option value="true">Send/Receive entire message</option>
+ </select>
+ </div>
+ <div class="form-tips">
+ Be default, <code>payload</code> will contain the data to be sent over, or received from a websocket.
+ The listener can be configured to send or receive the entire message object as a JSON formatted string.
+ <p id="node-config-ws-tip">This path will be relative to <code><span id="node-config-ws-path"></span></code>.</p>
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="websocket-listener">
+ <p>This configuration node creates a WebSocket Server using the specified path</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('websocket-listener',{
+ category: 'config',
+ defaults: {
+ path: {value:"",required:true,validate:RED.validators.regex(/^((?!\/debug\/ws).)*$/) },
+ wholemsg: {value:"false"}
+ },
+ inputs:0,
+ outputs:0,
+ label: function() {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) != "/") {
+ root = root+"/";
+ }
+ if (this.path.charAt(0) == "/") {
+ root += this.path.slice(1);
+ } else {
+ root += this.path;
+ }
+ return root;
+ },
+ oneditprepare: function() {
+ var root = RED.settings.httpNodeRoot;
+ if (root.slice(-1) == "/") {
+ root = root.slice(0,-1);
+ }
+ if (root == "") {
+ $("#node-config-ws-tip").hide();
+ } else {
+ $("#node-config-ws-path").html(root);
+ $("#node-config-ws-tip").show();
+ }
+ //document.getElementById("node-config-wsdocpath").innerHTML=
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/22-websocket.js b/dgbuilder/core_nodes/io/22-websocket.js
new file mode 100644
index 00000000..72eda502
--- /dev/null
+++ b/dgbuilder/core_nodes/io/22-websocket.js
@@ -0,0 +1,185 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var ws = require("ws"),
+ inspect = require("sys").inspect;
+
+ // A node red node that sets up a local websocket server
+ function WebSocketListenerNode(n) {
+ // Create a RED node
+ RED.nodes.createNode(this,n);
+
+ var node = this;
+
+ // Store local copies of the node configuration (as defined in the .html)
+ node.path = n.path;
+ node.wholemsg = (n.wholemsg === "true");
+
+ node._inputNodes = []; // collection of nodes that want to receive events
+
+ var path = RED.settings.httpNodeRoot || "/";
+ path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);
+
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Listen for 'newListener' events from RED.server
+ node._serverListeners = {};
+
+ var storeListener = function(/*String*/event,/*function*/listener){
+ if(event == "error" || event == "upgrade" || event == "listening"){
+ node._serverListeners[event] = listener;
+ }
+ }
+
+ node._clients = {};
+
+ RED.server.addListener('newListener',storeListener);
+
+ // Create a WebSocket Server
+ node.server = new ws.Server({server:RED.server,path:path});
+
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Stop listening for new listener events
+ RED.server.removeListener('newListener',storeListener);
+
+ node.server.on('connection', function(socket){
+ var id = (1+Math.random()*4294967295).toString(16);
+ node._clients[id] = socket;
+ socket.on('close',function() {
+ delete node._clients[id];
+ });
+ socket.on('message',function(data,flags){
+ node.handleEvent(id,socket,'message',data,flags);
+ });
+ socket.on('error', function(err) {
+ node.warn("An error occured on the ws connection: "+inspect(err));
+ });
+ });
+
+ node.on("close", function() {
+ // Workaround https://github.com/einaros/ws/pull/253
+ // Remove listeners from RED.server
+ var listener = null;
+ for(var event in node._serverListeners) {
+ if (node._serverListeners.hasOwnProperty(event)) {
+ listener = node._serverListeners[event];
+ if(typeof listener === "function"){
+ RED.server.removeListener(event,listener);
+ }
+ }
+ }
+ node._serverListeners = {};
+ node.server.close();
+ node._inputNodes = [];
+ });
+ }
+ RED.nodes.registerType("websocket-listener",WebSocketListenerNode);
+
+ WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){
+ this._inputNodes.push(handler);
+ }
+
+ WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){
+ var msg;
+ if (this.wholemsg) {
+ try {
+ msg = JSON.parse(data);
+ }
+ catch(err) {
+ msg = { payload:data };
+ }
+ } else {
+ msg = {
+ payload:data
+ };
+ }
+ msg._session = {type:"websocket",id:id};
+
+ for (var i = 0; i < this._inputNodes.length; i++) {
+ this._inputNodes[i].send(msg);
+ }
+ }
+
+ WebSocketListenerNode.prototype.broadcast = function(data){
+ try {
+ for (var i = 0; i < this.server.clients.length; i++) {
+ this.server.clients[i].send(data);
+ }
+ }
+ catch(e) { // swallow any errors
+ this.warn("ws:"+i+" : "+e);
+ }
+ }
+
+ WebSocketListenerNode.prototype.send = function(id,data) {
+ var session = this._clients[id];
+ if (session) {
+ try {
+ session.send(data);
+ }
+ catch(e) { // swallow any errors
+ }
+ }
+ }
+
+ function WebSocketInNode(n) {
+ RED.nodes.createNode(this,n);
+ this.server = n.server;
+ var node = this;
+ this.serverConfig = RED.nodes.getNode(this.server);
+ if (this.serverConfig) {
+ this.serverConfig.registerInputNode(this);
+ } else {
+ this.error("Missing server configuration");
+ }
+ }
+ RED.nodes.registerType("websocket in",WebSocketInNode);
+
+ function WebSocketOutNode(n) {
+ RED.nodes.createNode(this,n);
+ var node = this;
+ this.server = n.server;
+ this.serverConfig = RED.nodes.getNode(this.server);
+ if (!this.serverConfig) {
+ this.error("Missing server configuration");
+ }
+ this.on("input", function(msg) {
+ var payload;
+ if (this.serverConfig.wholemsg) {
+ delete msg._session;
+ payload = JSON.stringify(msg);
+ } else {
+ if (!Buffer.isBuffer(msg.payload)) { // if it's not a buffer make sure it's a string.
+ payload = RED.util.ensureString(msg.payload);
+ }
+ else {
+ payload = msg.payload;
+ }
+ }
+ if (msg._session && msg._session.type == "websocket") {
+ node.serverConfig.send(msg._session.id,payload);
+ } else {
+ node.serverConfig.broadcast(payload,function(error){
+ if (!!error) {
+ node.warn("An error occurred while sending:" + inspect(error));
+ }
+ });
+ }
+ });
+ }
+ RED.nodes.registerType("websocket out",WebSocketOutNode);
+}
diff --git a/dgbuilder/core_nodes/io/23-watch.html b/dgbuilder/core_nodes/io/23-watch.html
new file mode 100644
index 00000000..8bf22be5
--- /dev/null
+++ b/dgbuilder/core_nodes/io/23-watch.html
@@ -0,0 +1,57 @@
+<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/x-red" data-template-name="watch">
+ <div class="form-row node-input-filename">
+ <label for="node-input-files"><i class="fa fa-file"></i> File(s)</label>
+ <input type="text" id="node-input-files" placeholder="File(s) or Directory">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div id="node-input-tip" class="form-tips">On Windows you must use double slashes \\ in any directory names.</div>
+</script>
+
+<script type="text/x-red" data-help-name="watch">
+ <p>Watches a directory or file for any changes.</p>
+ <p>You can enter a list of comma separated directories or files if you like. You will need to put " around any that have spaces in.</p>
+ <p>On Windows you must use double slashes \\ in any directory names.</p>
+ <p>The full filename of the file that actually changed is put into <b>msg.payload</b>, while a stringified version of the watched criteria is returned in <b>msg.topic</b>.</p>
+ <p><b>msg.file</b> contains just the short filename of the file that changed.</p>
+ <p>Of course in Linux, <i>everything</i> could be a file and thus watched...</p>
+ <p><b>Note: </b>The directory or file must exist in order to be watched. If the file or directory gets deleted it may no longer be monitored even if it gets re-created.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('watch',{
+ category: 'advanced-input',
+ defaults: {
+ name: {value:""},
+ files: {value:"",required:true}
+ },
+ color:"BurlyWood",
+ inputs:0,
+ outputs:1,
+ icon: "watch.png",
+ label: function() {
+ return this.name||this.files;
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/23-watch.js b/dgbuilder/core_nodes/io/23-watch.js
new file mode 100644
index 00000000..8a17f5ac
--- /dev/null
+++ b/dgbuilder/core_nodes/io/23-watch.js
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var Notify = require("fs.notify");
+ var fs = require("fs");
+ var sep = require("path").sep;
+
+ function WatchNode(n) {
+ RED.nodes.createNode(this,n);
+
+ this.files = n.files.split(",");
+ for (var f =0; f < this.files.length; f++) {
+ this.files[f] = this.files[f].trim();
+ }
+ this.p = (this.files.length == 1) ? this.files[0] : JSON.stringify(this.files);
+ var node = this;
+
+ var notifications = new Notify(node.files);
+ notifications.on('change', function (file, event, path) {
+ try {
+ if (fs.statSync(path).isDirectory()) { path = path + sep + file; }
+ } catch(e) { }
+ var msg = { payload: path, topic: node.p, file: file };
+ node.send(msg);
+ });
+
+ notifications.on('error', function (error, path) {
+ node.warn(error);
+ });
+
+ this.close = function() {
+ notifications.close();
+ }
+ }
+ RED.nodes.registerType("watch",WatchNode);
+}
diff --git a/dgbuilder/core_nodes/io/25-serial.html b/dgbuilder/core_nodes/io/25-serial.html
new file mode 100644
index 00000000..225e4dc3
--- /dev/null
+++ b/dgbuilder/core_nodes/io/25-serial.html
@@ -0,0 +1,265 @@
+<!--
+ Copyright 2013,2014 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/x-red" data-template-name="serial in">
+ <div class="form-row node-input-serial">
+ <label for="node-input-serial"><i class="fa fa-random"></i> Serial Port</label>
+ <input type="text" id="node-input-serial">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="serial in">
+ <p>Reads data from a local serial port.</p>
+ <p>Can either <ul><li>wait for a "split" character (default \n). Also accepts hex notation (0x0a).</li>
+ <li>Wait for a timeout in milliseconds for the first character received</li>
+ <li>Wait to fill a fixed sized buffer</li></ul></p>
+ <p>It then outputs <b>msg.payload</b> as either a UTF8 ascii string or a binary Buffer object.</p>
+ <p>If no split character is specified, or a timeout or buffer size of 0, then a stream of single characters is sent - again either as ascii chars or size 1 binary buffers.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('serial in',{
+ category: 'input',
+ defaults: {
+ name: {name:""},
+ serial: {type:"serial-port",required:true}
+ },
+ color:"BurlyWood",
+ inputs:0,
+ outputs:1,
+ icon: "serial.png",
+ label: function() {
+ var serialNode = RED.nodes.node(this.serial);
+ return this.name||(serialNode?serialNode.label().split(":")[0]:"serial");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+<script type="text/x-red" data-template-name="serial out">
+ <div class="form-row node-input-serial">
+ <label for="node-input-serial"><i class="fa fa-random"></i> Serial Port</label>
+ <input type="text" id="node-input-serial">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="serial out">
+ <p>Provides a connection to an outbound serial port.</p>
+ <p>Only the <b>msg.payload</b> is sent.</p>
+ <p>Optionally the new line character used to split the input can be appended to every message sent out to the serial port.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('serial out',{
+ category: 'output',
+ defaults: {
+ name: {name:""},
+ serial: {type:"serial-port",required:true}
+ },
+ color:"BurlyWood",
+ inputs:1,
+ outputs:0,
+ icon: "serial.png",
+ align: "right",
+ label: function() {
+ var serialNode = RED.nodes.node(this.serial);
+ return this.name||(serialNode?serialNode.label().split(":")[0]:"serial");
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+
+<script type="text/x-red" data-template-name="serial-port">
+ <div class="form-row">
+ <label for="node-config-input-serialport"><i class="fa fa-random"></i> Serial Port</label>
+ <input type="text" id="node-config-input-serialport" style="width:60%;" placeholder="/dev/ttyUSB0"/>
+ <a id="node-config-lookup-serial" class="btn"><i id="node-config-lookup-serial-icon" class="fa fa-search"></i></a>
+ </div>
+ <div class="form-row">
+ <table><tr>
+ <td width = "102px"><i class="fa fa-wrench"></i> Settings</td>
+ <td width = "100px">Baud Rate</td>
+ <td width = "80px">Data Bits</td>
+ <td width = "80px">Parity</td>
+ <td width = "80px">Stop Bits</td>
+ </tr><tr><td>&nbsp;</td>
+ <td>
+ <select type="text" id="node-config-input-serialbaud" style="width: 100px;">
+ <option value="115200">115200</option>
+ <option value="57600">57600</option>
+ <option value="38400">38400</option>
+ <option value="19200">19200</option>
+ <option value="9600">9600</option>
+ <option value="4800">4800</option>
+ <option value="2400">2400</option>
+ <option value="1800">1800</option>
+ <option value="1200">1200</option>
+ <option value="600">600</option>
+ <option value="300">300</option>
+ <option value="200">200</option>
+ <option value="150">150</option>
+ <option value="134">134</option>
+ <option value="110">110</option>
+ <option value="75">75</option>
+ <option value="50">50</option>
+ </select>
+ </td><td>
+ <select type="text" id="node-config-input-databits" style="width: 80px;">
+ <option value="8">8</option>
+ <option value="7">7</option>
+ <option value="6">6</option>
+ <option value="5">5</option>
+ </select>
+ </td><td>
+ <select type="text" id="node-config-input-parity" style="width: 80px;">
+ <option value="none">None</option>
+ <option value="even">Even</option>
+ <option value="mark">Mark</option>
+ <option value="odd">Odd</option>
+ <option value="space">Space</option>
+ </select>
+ </td><td>
+ <select type="text" id="node-config-input-stopbits" style="width: 80px;">
+ <option value="2">2</option>
+ <option value="1">1</option>
+ </select>
+ </td>
+ </tr></table><br/>
+
+ <div class="form-row">
+ <label for="node-config-input-out"><i class="fa fa-cut"></i> Split input</label>
+ <select type="text" id="node-config-input-out" style="width:52%;">
+ <option value="char">when character received is</option>
+ <option value="time">after a fixed timeout of</option>
+ <option value="count">a fixed number of characters</option>
+ </select>
+ <input type="text" id="node-config-input-newline" style="width:50px;">
+ <span id="node-units"></span>
+ </div>
+
+ <div class="form-row">
+ <label for="node-config-input-bin"><i class="fa fa-sign-in"></i> and deliver</label>
+ <select type="text" id="node-config-input-bin" style="width: 77%;">
+ <option value="false">ascii strings</option>
+ <option value="bin">binary buffers</option>
+ </select>
+ </div>
+ <br/>
+ <div class="form-row" id="node-config-addchar">
+ <label for="node-config-input-addchar"><i class="fa fa-sign-out"></i> On output</label>
+ <select type="text" id="node-config-input-addchar" style="width: 77%;">
+ <option value="false">don't add 'split' character to output messages</option>
+ <option value="true">add 'split' character to output messages</option>
+ </select>
+ </div>
+ <div class="form-tips" id="tip-split">Tip: the "Split on" character is used to split the input into separate messages. It can also be added to every message sent out to the serial port.</div>
+ <div class="form-tips" id="tip-bin" hidden>Tip: In timeout mode timeout starts from arrival of first character.</div>
+ <script>
+ var previous = null;
+ $("#node-config-input-out").on('focus', function () { previous = this.value; }).change(function() {
+ if (previous == null) { previous = $("#node-config-input-out").val(); }
+ if ($("#node-config-input-out").val() == "char") {
+ if (previous != "char") { $("#node-config-input-newline").val("\\n"); }
+ $("#node-units").text("");
+ $("#node-config-addchar").show();
+ $("#tip-split").show();
+ $("#tip-bin").hide();
+ }
+ else if ($("#node-config-input-out").val() == "time") {
+ if (previous != "time") { $("#node-config-input-newline").val("0"); }
+ $("#node-units").text("ms");
+ $("#node-config-addchar").hide();
+ $("#node-config-input-addchar").val("false");
+ $("#tip-split").hide();
+ $("#tip-bin").show();
+ }
+ else {
+ if (previous != "count") { $("#node-config-input-newline").val("12"); }
+ $("#node-units").text("chars");
+ $("#node-config-addchar").hide();
+ $("#node-config-input-addchar").val("false");
+ $("#tip-split").hide();
+ $("#tip-bin").hide();
+ }
+ });
+
+ </script>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('serial-port',{
+ category: 'config',
+ defaults: {
+ //name: {value:""},
+ serialport: {value:"",required:true},
+ serialbaud: {value:57600,required:true},
+ databits: {value:8,required:true},
+ parity: {value:"none",required:true},
+ stopbits: {value:1,required:true},
+ newline: {value:"\\n"},
+ bin: {value:""},
+ out: {value:""},
+ addchar: {value:false}
+ },
+ label: function() {
+ this.serialbaud = this.serialbaud || 57600;
+ this.databits = this.databits || 8;
+ this.parity = this.parity || 'none';
+ this.stopbits = this.stopbits || 1;
+ return this.serialport+":"+this.serialbaud+"-"+this.databits+this.parity.charAt(0).toUpperCase()+this.stopbits;
+ },
+ oneditprepare: function() {
+ try {
+ $("#node-config-input-serialport").autocomplete( "destroy" );
+ } catch(err) {
+ }
+ $("#node-config-lookup-serial").click(function() {
+ //$("#node-config-lookup-serial-icon").removeClass('fa fa-search');
+ //$("#node-config-lookup-serial-icon").addClass('fa fa-spinner');
+ $("#node-config-lookup-serial").addClass('disabled');
+ $.getJSON('serialports',function(data) {
+ //$("#node-config-lookup-serial-icon").addClass('fa fa-search');
+ //$("#node-config-lookup-serial-icon").removeClass('fa fa-spinner');
+ $("#node-config-lookup-serial").removeClass('disabled');
+ var ports = [];
+ $.each(data, function(i, port){
+ ports.push(port.comName);
+ });
+ $("#node-config-input-serialport").autocomplete({
+ source:ports,
+ minLength:0,
+ close: function( event, ui ) {
+ $("#node-config-input-serialport").autocomplete( "destroy" );
+ }
+ }).autocomplete("search","");
+ });
+ });
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/25-serial.js b/dgbuilder/core_nodes/io/25-serial.js
new file mode 100644
index 00000000..96e4aca6
--- /dev/null
+++ b/dgbuilder/core_nodes/io/25-serial.js
@@ -0,0 +1,310 @@
+/**
+* Copyright 2013,2014 IBM Corp.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+**/
+
+module.exports = function(RED) {
+ "use strict";
+ var settings = RED.settings;
+ var events = require("events");
+ var util = require("util");
+ var serialp = require("serialport");
+ var bufMaxSize = 32768; // Max serial buffer size, for inputs...
+
+ // TODO: 'serialPool' should be encapsulated in SerialPortNode
+
+ function SerialPortNode(n) {
+ RED.nodes.createNode(this,n);
+ this.serialport = n.serialport;
+ this.newline = n.newline;
+ this.addchar = n.addchar || "false";
+ this.serialbaud = parseInt(n.serialbaud) || 57600;
+ this.databits = parseInt(n.databits) || 8;
+ this.parity = n.parity || "none";
+ this.stopbits = parseInt(n.stopbits) || 1;
+ this.bin = n.bin || "false";
+ this.out = n.out || "char";
+ }
+ RED.nodes.registerType("serial-port",SerialPortNode);
+
+ function SerialOutNode(n) {
+ RED.nodes.createNode(this,n);
+ this.serial = n.serial;
+ this.serialConfig = RED.nodes.getNode(this.serial);
+
+ if (this.serialConfig) {
+ var node = this;
+ node.port = serialPool.get(this.serialConfig.serialport,
+ this.serialConfig.serialbaud,
+ this.serialConfig.databits,
+ this.serialConfig.parity,
+ this.serialConfig.stopbits,
+ this.serialConfig.newline);
+ node.addCh = "";
+ if (node.serialConfig.addchar == "true") {
+ node.addCh = this.serialConfig.newline.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0");
+ }
+ node.on("input",function(msg) {
+ var payload = msg.payload;
+ if (!Buffer.isBuffer(payload)) {
+ if (typeof payload === "object") {
+ payload = JSON.stringify(payload);
+ } else {
+ payload = payload.toString();
+ }
+ payload += node.addCh;
+ } else if (node.addCh !== "") {
+ payload = Buffer.concat([payload,new Buffer(node.addCh)]);
+ }
+ node.port.write(payload,function(err,res) {
+ if (err) {
+ node.error(err);
+ }
+ });
+ });
+ node.port.on('ready', function() {
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ node.port.on('closed', function() {
+ node.status({fill:"red",shape:"ring",text:"not connected"});
+ });
+ } else {
+ this.error("missing serial config");
+ }
+
+ this.on("close", function(done) {
+ if (this.serialConfig) {
+ serialPool.close(this.serialConfig.serialport,done);
+ } else {
+ done();
+ }
+ });
+ }
+ RED.nodes.registerType("serial out",SerialOutNode);
+
+
+ function SerialInNode(n) {
+ RED.nodes.createNode(this,n);
+ this.serial = n.serial;
+ this.serialConfig = RED.nodes.getNode(this.serial);
+
+ if (this.serialConfig) {
+ var node = this;
+ node.tout = null;
+ var buf;
+ if (node.serialConfig.out != "count") { buf = new Buffer(bufMaxSize); }
+ else { buf = new Buffer(Number(node.serialConfig.newline)); }
+ var i = 0;
+ node.status({fill:"grey",shape:"dot",text:"unknown"});
+ node.port = serialPool.get(this.serialConfig.serialport,
+ this.serialConfig.serialbaud,
+ this.serialConfig.databits,
+ this.serialConfig.parity,
+ this.serialConfig.stopbits,
+ this.serialConfig.newline
+ );
+
+ var splitc;
+ if (node.serialConfig.newline.substr(0,2) == "0x") {
+ splitc = new Buffer([parseInt(node.serialConfig.newline)]);
+ } else {
+ splitc = new Buffer(node.serialConfig.newline.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"));
+ }
+
+ this.port.on('data', function(msg) {
+ // single char buffer
+ if ((node.serialConfig.newline === 0)||(node.serialConfig.newline === "")) {
+ if (node.serialConfig.bin !== "bin") { node.send({"payload": String.fromCharCode(msg)}); }
+ else { node.send({"payload": new Buffer([msg])}); }
+ }
+ else {
+ // do the timer thing
+ if (node.serialConfig.out === "time") {
+ if (node.tout) {
+ i += 1;
+ buf[i] = msg;
+ }
+ else {
+ node.tout = setTimeout(function () {
+ node.tout = null;
+ var m = new Buffer(i+1);
+ buf.copy(m,0,0,i+1);
+ if (node.serialConfig.bin !== "bin") { m = m.toString(); }
+ node.send({"payload": m});
+ m = null;
+ }, node.serialConfig.newline);
+ i = 0;
+ buf[0] = msg;
+ }
+ }
+ // count bytes into a buffer...
+ else if (node.serialConfig.out === "count") {
+ buf[i] = msg;
+ i += 1;
+ if ( i >= parseInt(node.serialConfig.newline)) {
+ var m = new Buffer(i);
+ buf.copy(m,0,0,i);
+ if (node.serialConfig.bin !== "bin") { m = m.toString(); }
+ node.send({"payload":m});
+ m = null;
+ i = 0;
+ }
+ }
+ // look to match char...
+ else if (node.serialConfig.out === "char") {
+ buf[i] = msg;
+ i += 1;
+ if ((msg === splitc[0]) || (i === bufMaxSize)) {
+ var m = new Buffer(i);
+ buf.copy(m,0,0,i);
+ if (node.serialConfig.bin !== "bin") { m = m.toString(); }
+ node.send({"payload":m});
+ m = null;
+ i = 0;
+ }
+ }
+ else { console.log("Should never get here"); }
+ }
+ });
+ this.port.on('ready', function() {
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ this.port.on('closed', function() {
+ node.status({fill:"red",shape:"ring",text:"not connected"});
+ });
+ } else {
+ this.error("missing serial config");
+ }
+
+ this.on("close", function(done) {
+ if (this.serialConfig) {
+ serialPool.close(this.serialConfig.serialport,done);
+ } else {
+ done();
+ }
+ });
+ }
+ RED.nodes.registerType("serial in",SerialInNode);
+
+
+ var serialPool = function() {
+ var connections = {};
+ return {
+ get:function(port,baud,databits,parity,stopbits,newline,callback) {
+ var id = port;
+ if (!connections[id]) {
+ connections[id] = function() {
+ var obj = {
+ _emitter: new events.EventEmitter(),
+ serial: null,
+ _closing: false,
+ tout: null,
+ on: function(a,b) { this._emitter.on(a,b); },
+ close: function(cb) { this.serial.close(cb); },
+ write: function(m,cb) { this.serial.write(m,cb); },
+ }
+ //newline = newline.replace("\\n","\n").replace("\\r","\r");
+ var setupSerial = function() {
+ //if (newline == "") {
+ obj.serial = new serialp.SerialPort(port,{
+ baudrate: baud,
+ databits: databits,
+ parity: parity,
+ stopbits: stopbits,
+ parser: serialp.parsers.raw
+ },true, function(err, results) { if (err) { obj.serial.emit('error',err); } });
+ //}
+ //else {
+ // obj.serial = new serialp.SerialPort(port,{
+ // baudrate: baud,
+ // databits: databits,
+ // parity: parity,
+ // stopbits: stopbits,
+ // parser: serialp.parsers.readline(newline)
+ // },true, function(err, results) { if (err) obj.serial.emit('error',err); });
+ //}
+ obj.serial.on('error', function(err) {
+ util.log("[serial] serial port "+port+" error "+err);
+ obj._emitter.emit('closed');
+ obj.tout = setTimeout(function() {
+ setupSerial();
+ }, settings.serialReconnectTime);
+ });
+ obj.serial.on('close', function() {
+ if (!obj._closing) {
+ util.log("[serial] serial port "+port+" closed unexpectedly");
+ obj._emitter.emit('closed');
+ obj.tout = setTimeout(function() {
+ setupSerial();
+ }, settings.serialReconnectTime);
+ }
+ });
+ obj.serial.on('open',function() {
+ util.log("[serial] serial port "+port+" opened at "+baud+" baud "+databits+""+parity.charAt(0).toUpperCase()+stopbits);
+ if (obj.tout) { clearTimeout(obj.tout); }
+ //obj.serial.flush();
+ obj._emitter.emit('ready');
+ });
+ obj.serial.on('data',function(d) {
+ //console.log(Buffer.isBuffer(d),d.length,d);
+ //if (typeof d !== "string") {
+ // //d = d.toString();
+ for (var z=0; z<d.length; z++) {
+ obj._emitter.emit('data',d[z]);
+ }
+ //}
+ //else {
+ // obj._emitter.emit('data',d);
+ //}
+ });
+ obj.serial.on("disconnect",function() {
+ util.log("[serial] serial port "+port+" gone away");
+ });
+ }
+ setupSerial();
+ return obj;
+ }();
+ }
+ return connections[id];
+ },
+ close: function(port,done) {
+ if (connections[port]) {
+ if (connections[port].tout != null) {
+ clearTimeout(connections[port].tout);
+ }
+ connections[port]._closing = true;
+ try {
+ connections[port].close(function() {
+ util.log("[serial] serial port closed");
+ done();
+ });
+ }
+ catch(err) { }
+ delete connections[port];
+ } else {
+ done();
+ }
+ }
+ }
+ }();
+
+ RED.httpAdmin.get("/serialports",function(req,res) {
+ serialp.list(function (err, ports) {
+ //console.log(JSON.stringify(ports));
+ res.writeHead(200, {'Content-Type': 'text/plain'});
+ res.write(JSON.stringify(ports));
+ res.end();
+ });
+ });
+}
diff --git a/dgbuilder/core_nodes/io/31-tcpin.html b/dgbuilder/core_nodes/io/31-tcpin.html
new file mode 100644
index 00000000..c8cec599
--- /dev/null
+++ b/dgbuilder/core_nodes/io/31-tcpin.html
@@ -0,0 +1,299 @@
+<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script type="text/x-red" data-template-name="tcp in">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-dot-circle-o"></i> Type</label>
+ <select id="node-input-server" style="width:120px; margin-right:5px;">
+ <option value="server">Listen on</option>
+ <option value="client">Connect to</option>
+ </select>
+ port <input type="text" id="node-input-port" style="width: 50px">
+ </div>
+ <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;">
+ at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 60%;">
+ </div>
+
+ <div class="form-row">
+ <label><i class="fa fa-sign-out"></i> Output</label>
+ a
+ <select id="node-input-datamode" style="width:110px;">
+ <option value="stream">stream of</option>
+ <option value="single">single</option>
+ </select>
+ <select id="node-input-datatype" style="width:140px;">
+ <option value="buffer">Buffer</option>
+ <option value="utf8">String</option>
+ <option value="base64">Base64 String</option>
+ </select>
+ payload<span id="node-input-datamode-plural">s</span>
+ </div>
+
+ <div id="node-row-newline" class="form-row hidden" style="padding-left: 110px;">
+ delimited by <input type="text" id="node-input-newline" style="width: 110px;">
+ </div>
+
+ <div class="form-row">
+ <label for="node-input-topic"><i class="fa fa-tasks"></i> Topic</label>
+ <input type="text" id="node-input-topic" placeholder="Topic">
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="tcp in">
+ <p>Provides a choice of tcp inputs. Can either connect to a remote tcp port,
+ or accept incoming connections.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('tcp in',{
+ category: 'input',
+ color:"Silver",
+ defaults: {
+ server: {value:"server",required:true},
+ host: {value:"",validate:function(v) { return (this.server == "server")||v.length > 0;} },
+ port: {value:"",required:true,validate:RED.validators.number()},
+ datamode:{value:"stream"},
+ datatype:{value:"buffer"},
+ newline:{value:""},
+ topic: {value:""},
+ name: {value:""},
+ base64: {/*deprecated*/ value:false,required:true}
+ },
+ inputs:0,
+ outputs:1,
+ icon: "bridge-dash.png",
+ label: function() {
+ return this.name || "tcp:"+(this.host?this.host+":":"")+this.port;
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ },
+ oneditprepare: function() {
+ var updateOptions = function() {
+ var sockettype = $("#node-input-server option:selected").val();
+ if (sockettype == "client") {
+ $("#node-input-host-row").show();
+ } else {
+ $("#node-input-host-row").hide();
+ }
+ var datamode = $("#node-input-datamode option:selected").val();
+ var datatype = $("#node-input-datatype option:selected").val();
+ if (datamode == "stream") {
+ $("#node-input-datamode-plural").show();
+ if (datatype == "utf8") {
+ $("#node-row-newline").show();
+ } else {
+ $("#node-row-newline").hide();
+ }
+ } else {
+ $("#node-input-datamode-plural").hide();
+ $("#node-row-newline").hide();
+ }
+ };
+ updateOptions();
+ $("#node-input-server").change(updateOptions);
+ $("#node-input-datatype").change(updateOptions);
+ $("#node-input-datamode").change(updateOptions);
+ }
+ });
+</script>
+
+
+<script type="text/x-red" data-template-name="tcp out">
+ <div class="form-row">
+ <label for="node-input-beserver"><i class="fa fa-dot-circle-o"></i> Type</label>
+ <select id="node-input-beserver" style="width:150px; margin-right:5px;">
+ <option value="server">Listen on</option>
+ <option value="client">Connect to</option>
+ <option value="reply">Reply to TCP</option>
+ </select>
+ <span id="node-input-port-row">port <input type="text" id="node-input-port" style="width: 50px"></span>
+ </div>
+
+ <div class="form-row hidden" id="node-input-host-row" style="padding-left: 110px;">
+ at host <input type="text" id="node-input-host" placeholder="localhost" style="width: 60%;">
+ </div>
+
+ <div class="form-row hidden" id="node-input-end-row">
+ <label>&nbsp;</label>
+ <input type="checkbox" id="node-input-end" style="display: inline-block; width: auto; vertical-align: top;">
+ <label for="node-input-end" style="width: 70%;">Close connection after each message is sent ?</label>
+ </div>
+
+ <div class="form-row">
+ <label>&nbsp;</label>
+ <input type="checkbox" id="node-input-base64" placeholder="base64" style="display: inline-block; width: auto; vertical-align: top;">
+ <label for="node-input-base64" style="width: 70%;">Decode Base64 message ?</label>
+ </div>
+
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+
+ <div class="form-tips hidden" id="fin-tip">
+ <b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example.
+ </div>
+ <div class="form-tips hidden" id="fin-tip2">
+ <b>Note:</b> Closing the connection after each message is generally not a good thing - but is useful to indicate an end-of-file for example. The receiving client will need to reconnect.
+ </div>
+</script>
+
+<script type="text/x-red" data-help-name="tcp out">
+ <p>Provides a choice of tcp outputs. Can either connect to a remote tcp port,
+ accept incoming connections, or reply to messages received from a TCP In node.</p>
+ <p>Only <b>msg.payload</b> is sent.</p>
+ <p>If <b>msg.payload</b> is a string containing a Base64 encoding of binary
+ data, the Base64 decoding option will cause it to be converted back to binary
+ before being sent.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('tcp out',{
+ category: 'output',
+ color:"Silver",
+ defaults: {
+ host: {value:"",validate:function(v) { return (this.beserver != "client")||v.length > 0;} },
+ port: {value:"",validate:function(v) { return (this.beserver == "reply")||RED.validators.number()(v) } },
+ beserver: {value:"client",required:true},
+ base64: {value:false,required:true},
+ end: {value:false,required:true},
+ name: {value:""}
+ },
+ inputs:1,
+ outputs:0,
+ icon: "bridge-dash.png",
+ align: "right",
+ label: function() {
+ return this.name || "tcp:"+(this.host?this.host+":":"")+this.port;
+ },
+ labelStyle: function() {
+ return (this.name)?"node_label_italic":"";
+ },
+ oneditprepare: function() {
+ var updateOptions = function() {
+ var sockettype = $("#node-input-beserver option:selected").val();
+ if (sockettype == "reply") {
+ $("#node-input-port-row").hide();
+ $("#node-input-host-row").hide();
+ $("#node-input-end-row").hide();
+ } else {
+ $("#node-input-port-row").show();
+ $("#node-input-end-row").show();
+ }
+
+ if (sockettype == "client") {
+ $("#node-input-host-row").show();
+ $("#fin-tip").show();
+ } else {
+ $("#node-input-host-row").hide();
+ $("#fin-tip").hide();
+ }
+
+ if (sockettype == "server") {
+ $("#fin-tip2").show();
+ }
+ else {
+ $("#fin-tip2").hide();
+ }
+
+ };
+ updateOptions();
+ $("#node-input-beserver").change(updateOptions);
+ }
+ });
+</script>
+
+
+<script type="text/x-red" data-template-name="tcp request">
+ <div class="form-row">
+ <label for="node-input-server"><i class="fa fa-globe"></i> Server</label>
+ <input type="text" id="node-input-server" placeholder="ip.address" style="width:50%">
+ &nbsp;port <input type="text" id="node-input-port" style="width:50px">
+ </div>
+ <div class="form-row">
+ <label for="node-input-out"><i class="fa fa-sign-out"></i> Return</label>
+ <select type="text" id="node-input-out" style="width:52%;">
+ <option value="time">after a fixed timeout of</option>
+ <option value="char">when character received is</option>
+ <option value="count">a fixed number of characters</option>
+ <option value="sit">never. Keep connection open</option>
+ </select>
+ <input type="text" id="node-input-splitc" style="width:50px;">
+ <span id="node-units"></span>
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div class="form-tips"><b>Tip:</b> outputs a binary <b>Buffer</b>, so you may want to .toString() it.</div>
+ <script>
+ var previous = null;
+ $("#node-input-out").on('focus', function () { previous = this.value; }).change(function() {
+ if (previous == null) { previous = $("#node-input-out").val(); }
+ if ($("#node-input-out").val() == "char") {
+ if (previous != "char") $("#node-input-splitc").val("\\n");
+ $("#node-units").text("");
+ }
+ else if ($("#node-input-out").val() == "time") {
+ if (previous != "time") $("#node-input-splitc").val("0");
+ $("#node-units").text("ms");
+ }
+ else if ($("#node-input-out").val() == "count") {
+ if (previous != "count") $("#node-input-splitc").val("12");
+ $("#node-units").text("chars");
+ }
+ else {
+ if (previous != "sit") $("#node-input-splitc").val("0");
+ $("#node-units").text("");
+ }
+ });
+</script>
+
+<script type="text/x-red" data-help-name="tcp request">
+ <p>A simple TCP request node - sends the <b>msg.payload</b> to a server tcp port and expects a response.</p>
+ <p>Connects, sends the "request", reads the "response". It can either count a number of
+ returned characters into a fixed buffer, match a specified character before returning,
+ wait a fixed timeout from first reply and then return, or just sit and wait for data.</p>
+ <p>The response will be output in <b>msg.payload</b> as a buffer, so you may want to .toString() it.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('tcp request',{
+ category: 'function',
+ color:"Silver",
+ defaults: {
+ server: {value:"",required:true},
+ port: {value:"",required:true,validate:RED.validators.number()},
+ out: {value:"time",required:true},
+ splitc: {value:"0",required:true},
+ name: {value:""}
+ },
+ inputs:1,
+ outputs:1,
+ icon: "bridge-dash.png",
+ label: function() {
+ return this.name || "tcp:"+(this.server?this.server+":":"")+this.port;
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/31-tcpin.js b/dgbuilder/core_nodes/io/31-tcpin.js
new file mode 100644
index 00000000..2e4e5e7b
--- /dev/null
+++ b/dgbuilder/core_nodes/io/31-tcpin.js
@@ -0,0 +1,472 @@
+/**
+ * Copyright 2013,2014 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var reconnectTime = RED.settings.socketReconnectTime||10000;
+ var socketTimeout = RED.settings.socketTimeout||null;
+ var net = require('net');
+
+ var connectionPool = {};
+
+ function TcpIn(n) {
+ RED.nodes.createNode(this,n);
+ this.host = n.host;
+ this.port = n.port * 1;
+ this.topic = n.topic;
+ this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/
+ this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */
+ this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r");
+ this.base64 = n.base64;
+ this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
+ this.closing = false;
+ var node = this;
+ var count = 0;
+
+ if (!node.server) {
+ var buffer = null;
+ var client;
+ var reconnectTimeout;
+ var end = false;
+ var setupTcpClient = function() {
+ node.log("connecting to "+node.host+":"+node.port);
+ node.status({fill:"grey",shape:"dot",text:"connecting"});
+ var id = (1+Math.random()*4294967295).toString(16);
+ client = net.connect(node.port, node.host, function() {
+ buffer = (node.datatype == 'buffer')? new Buffer(0):"";
+ node.log("connected to "+node.host+":"+node.port);
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ connectionPool[id] = client;
+
+ client.on('data', function (data) {
+ if (node.datatype != 'buffer') {
+ data = data.toString(node.datatype);
+ }
+ if (node.stream) {
+ if ((node.datatype) === "utf8" && node.newline != "") {
+ buffer = buffer+data;
+ var parts = buffer.split(node.newline);
+ for (var i = 0;i<parts.length-1;i+=1) {
+ var msg = {topic:node.topic, payload:parts[i]};
+ msg._session = {type:"tcp",id:id};
+ node.send(msg);
+ }
+ buffer = parts[parts.length-1];
+ } else {
+ var msg = {topic:node.topic, payload:data};
+ msg._session = {type:"tcp",id:id};
+ node.send(msg);
+ }
+ } else {
+ if ((typeof data) === "string") {
+ buffer = buffer+data;
+ } else {
+ buffer = Buffer.concat([buffer,data],buffer.length+data.length);
+ }
+ }
+ });
+ client.on('end', function() {
+ if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
+ var msg = {topic:node.topic,payload:buffer};
+ msg._session = {type:"tcp",id:id};
+ if (buffer.length !== 0) {
+ end = true; // only ask for fast re-connect if we actually got something
+ node.send(msg);
+ }
+ buffer = null;
+ }
+ });
+ client.on('close', function() {
+ delete connectionPool[id];
+ node.status({fill:"red",shape:"ring",text:"disconnected"});
+ if (!node.closing) {
+ if (end) { // if we were asked to close then try to reconnect once very quick.
+ end = false;
+ reconnectTimeout = setTimeout(setupTcpClient, 20);
+ }
+ else {
+ node.log("connection lost to "+node.host+":"+node.port);
+ reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
+ }
+ }
+ });
+ client.on('error', function(err) {
+ node.log(err);
+ });
+ }
+ setupTcpClient();
+
+ this.on('close', function() {
+ this.closing = true;
+ client.end();
+ clearTimeout(reconnectTimeout);
+ });
+ } else {
+ var server = net.createServer(function (socket) {
+ if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
+ var id = (1+Math.random()*4294967295).toString(16);
+ connectionPool[id] = socket;
+ node.status({text:++count+" connections"});
+
+ var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
+ socket.on('data', function (data) {
+ if (node.datatype != 'buffer') {
+ data = data.toString(node.datatype);
+ }
+ if (node.stream) {
+ if ((typeof data) === "string" && node.newline != "") {
+ buffer = buffer+data;
+ var parts = buffer.split(node.newline);
+ for (var i = 0;i<parts.length-1;i+=1) {
+ var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
+ msg._session = {type:"tcp",id:id};
+ node.send(msg);
+ }
+ buffer = parts[parts.length-1];
+ } else {
+ var msg = {topic:node.topic, payload:data};
+ msg._session = {type:"tcp",id:id};
+ node.send(msg);
+ }
+ } else {
+ if ((typeof data) === "string") {
+ buffer = buffer+data;
+ } else {
+ buffer = Buffer.concat([buffer,data],buffer.length+data.length);
+ }
+ }
+ });
+ socket.on('end', function() {
+ if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) {
+ if (buffer.length > 0) {
+ var msg = {topic:node.topic,payload:buffer};
+ msg._session = {type:"tcp",id:id};
+ node.send(msg);
+ }
+ buffer = null;
+ }
+ });
+ socket.on('timeout', function() {
+ node.log('timeout closed socket port '+node.port);
+ socket.end();
+ });
+ socket.on('close', function() {
+ delete connectionPool[id];
+ node.status({text:--count+" connections"});
+ });
+ socket.on('error',function(err) {
+ node.log(err);
+ });
+ });
+ server.on('error', function(err) {
+ if (err) {
+ node.error('unable to listen on port '+node.port+' : '+err);
+ }
+ });
+ server.listen(node.port, function(err) {
+ if (err) {
+ node.error('unable to listen on port '+node.port+' : '+err);
+ } else {
+ node.log('listening on port '+node.port);
+
+ node.on('close', function() {
+ node.closing = true;
+ server.close();
+ node.log('stopped listening on port '+node.port);
+ });
+ }
+ });
+ }
+
+ }
+ RED.nodes.registerType("tcp in",TcpIn);
+
+ function TcpOut(n) {
+ RED.nodes.createNode(this,n);
+ this.host = n.host;
+ this.port = n.port * 1;
+ this.base64 = n.base64;
+ this.doend = n.end || false;
+ this.beserver = n.beserver;
+ this.name = n.name;
+ this.closing = false;
+ var node = this;
+
+ if (!node.beserver||node.beserver=="client") {
+ var reconnectTimeout;
+ var client = null;
+ var connected = false;
+ var end = false;
+
+ var setupTcpClient = function() {
+ node.log("connecting to "+node.host+":"+node.port);
+ node.status({fill:"grey",shape:"dot",text:"connecting"});
+ client = net.connect(node.port, node.host, function() {
+ connected = true;
+ node.log("connected to "+node.host+":"+node.port);
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ });
+ client.on('error', function (err) {
+ node.log('error : '+err);
+ });
+ client.on('end', function (err) {
+ });
+ client.on('close', function() {
+ node.status({fill:"red",shape:"ring",text:"disconnected"});
+ connected = false;
+ client.destroy();
+ if (!node.closing) {
+ if (end) {
+ end = false;
+ reconnectTimeout = setTimeout(setupTcpClient,20);
+ }
+ else {
+ node.log("connection lost to "+node.host+":"+node.port);
+ reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
+ }
+ }
+ });
+ }
+ setupTcpClient();
+
+ node.on("input", function(msg) {
+ if (connected && msg.payload != null) {
+ if (Buffer.isBuffer(msg.payload)) {
+ client.write(msg.payload);
+ } else if (typeof msg.payload === "string" && node.base64) {
+ client.write(new Buffer(msg.payload,'base64'));
+ } else {
+ client.write(new Buffer(""+msg.payload));
+ }
+ if (node.doend === true) {
+ end = true;
+ client.end();
+ }
+ }
+ });
+
+ node.on("close", function() {
+ this.closing = true;
+ client.end();
+ clearTimeout(reconnectTimeout);
+ });
+
+ } else if (node.beserver == "reply") {
+ node.on("input",function(msg) {
+ if (msg._session && msg._session.type == "tcp") {
+ var client = connectionPool[msg._session.id];
+ if (client) {
+ if (Buffer.isBuffer(msg.payload)) {
+ client.write(msg.payload);
+ } else if (typeof msg.payload === "string" && node.base64) {
+ client.write(new Buffer(msg.payload,'base64'));
+ } else {
+ client.write(new Buffer(""+msg.payload));
+ }
+ }
+ }
+ });
+ } else {
+ var connectedSockets = [];
+ node.status({text:"0 connections"});
+ var server = net.createServer(function (socket) {
+ if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
+ var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
+ node.log("connection from "+remoteDetails);
+ connectedSockets.push(socket);
+ node.status({text:connectedSockets.length+" connections"});
+ socket.on('timeout', function() {
+ node.log('timeout closed socket port '+node.port);
+ socket.end();
+ });
+ socket.on('close',function() {
+ node.log("connection closed from "+remoteDetails);
+ connectedSockets.splice(connectedSockets.indexOf(socket),1);
+ node.status({text:connectedSockets.length+" connections"});
+ });
+ socket.on('error',function() {
+ node.log("socket error from "+remoteDetails);
+ connectedSockets.splice(connectedSockets.indexOf(socket),1);
+ node.status({text:connectedSockets.length+" connections"});
+ });
+ });
+
+ node.on("input", function(msg) {
+ if (msg.payload != null) {
+ var buffer;
+ if (Buffer.isBuffer(msg.payload)) {
+ buffer = msg.payload;
+ } else if (typeof msg.payload === "string" && node.base64) {
+ buffer = new Buffer(msg.payload,'base64');
+ } else {
+ buffer = new Buffer(""+msg.payload);
+ }
+ for (var i = 0; i<connectedSockets.length;i+=1) {
+ if (node.doend === true) { connectedSockets[i].end(buffer); }
+ else { connectedSockets[i].write(buffer); }
+ }
+ }
+ });
+
+ server.on('error', function(err) {
+ if (err) {
+ node.error('unable to listen on port '+node.port+' : '+err);
+ }
+ });
+
+ server.listen(node.port, function(err) {
+ if (err) {
+ node.error('unable to listen on port '+node.port+' : '+err);
+ } else {
+ node.log('listening on port '+node.port);
+ node.on('close', function() {
+ server.close();
+ node.log('stopped listening on port '+node.port);
+ });
+ }
+ });
+ }
+ }
+ RED.nodes.registerType("tcp out",TcpOut);
+
+ function TcpGet(n) {
+ RED.nodes.createNode(this,n);
+ this.server = n.server;
+ this.port = Number(n.port);
+ this.out = n.out;
+ this.splitc = n.splitc;
+
+ if (this.out != "char") { this.splitc = Number(this.splitc); }
+ else { this.splitc.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); }
+
+ var buf;
+ if (this.out == "count") { buf = new Buffer(this.splitc); }
+ else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully
+
+ this.connected = false;
+ var node = this;
+ var client;
+
+ this.on("input", function(msg) {
+ var i = 0;
+ if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
+ msg.payload = msg.payload.toString();
+ }
+ if (!node.connected) {
+ client = net.Socket();
+ client.setTimeout(socketTimeout);
+ node.status({});
+ client.connect(node.port, node.server, function() {
+ //node.log('client connected');
+ node.status({fill:"green",shape:"dot",text:"connected"});
+ node.connected = true;
+ client.write(msg.payload);
+ });
+
+ client.on('data', function(data) {
+ //node.log("data:"+ data.length+":"+ data);
+ if (node.splitc === 0) {
+ node.send({"payload": data});
+ }
+ else if (node.out === "sit") { // if we are staying connected just send the buffer
+ node.send({"payload": data});
+ }
+ else {
+ for (var j = 0; j < data.length; j++ ) {
+ if (node.out === "time") {
+ // do the timer thing
+ if (node.tout) {
+ i += 1;
+ buf[i] = data[j];
+ }
+ else {
+ node.tout = setTimeout(function () {
+ node.tout = null;
+ var m = new Buffer(i+1);
+ buf.copy(m,0,0,i+1);
+ node.send({"payload": m});
+ client.end();
+ m = null;
+ }, node.splitc);
+ i = 0;
+ buf[0] = data[j];
+ }
+ }
+ // count bytes into a buffer...
+ else if (node.out == "count") {
+ buf[i] = data[j];
+ i += 1;
+ if ( i >= node.serialConfig.count) {
+ node.send({"payload": buf});
+ client.end();
+ i = 0;
+ }
+ }
+ // look for a char
+ else {
+ buf[i] = data[j];
+ i += 1;
+ if (data[j] == node.splitc) {
+ var m = new Buffer(i);
+ buf.copy(m,0,0,i);
+ node.send({"payload": m});
+ client.end();
+ m = null;
+ i = 0;
+ }
+ }
+ }
+ }
+ });
+
+ client.on('end', function() {
+ //node.log('client disconnected');
+ node.connected = false;
+ node.status({});
+ client = null;
+ });
+
+ client.on('error', function() {
+ node.log('connect failed');
+ node.status({fill:"red",shape:"ring",text:"error"});
+ if (client) { client.end(); }
+ });
+
+ client.on('timeout',function() {
+ node.log('connect timeout');
+ if (client) {
+ client.end();
+ setTimeout(function() {
+ client.connect(node.port, node.server, function() {
+ //node.log('client connected');
+ node.connected = true;
+ client.write(msg.payload);
+ });
+ },reconnectTime);
+ }
+ });
+ }
+ else { client.write(msg.payload); }
+ });
+
+ this.on("close", function() {
+ if (client) { buf = null; client.end(); }
+ });
+
+ }
+ RED.nodes.registerType("tcp request",TcpGet);
+}
diff --git a/dgbuilder/core_nodes/io/32-udp.html b/dgbuilder/core_nodes/io/32-udp.html
new file mode 100644
index 00000000..1c2eed57
--- /dev/null
+++ b/dgbuilder/core_nodes/io/32-udp.html
@@ -0,0 +1,212 @@
+<!--
+ Copyright 2013 IBM Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- The Input Node -->
+<script type="text/x-red" data-template-name="udp in">
+ <div class="form-row">
+ <label for="node-input-port"><i class="fa fa-sign-in"></i> Listen</label>
+ on port <input type="text" id="node-input-port" placeholder="Port" style="width: 45px">
+ for <select id="node-input-multicast" style='width:40%'>
+ <option value="false">udp messages</option>
+ <option value="true">multicast messages</option>
+ </select>
+ </div>
+ <div class="form-row node-input-group">
+ <label for="node-input-group"><i class="fa fa-list"></i> Group</label>
+ <input type="text" id="node-input-group" placeholder="225.0.18.83">
+ </div>
+ <div class="form-row node-input-iface">
+ <label for="node-input-iface"><i class="fa fa-random"></i> Interface</label>
+ <input type="text" id="node-input-iface" placeholder="(optional) ip address of eth0">
+ </div>
+ <div class="form-row">
+ <label for="node-input-datatype"><i class="fa fa-sign-out"></i> Output</label>
+ <select id="node-input-datatype" style="width: 70%;">
+ <option value="buffer">a Buffer</option>
+ <option value="utf8">a String</option>
+ <option value="base64">a Base64 encoded string</option>
+ </select>
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div class="form-tips">Tip: Make sure your firewall will allow the data in.</div>
+ <script>
+ $("#node-input-multicast").change(function() {
+ var id = $("#node-input-multicast option:selected").val();
+ if (id == "false") {
+ $(".node-input-group").hide();
+ $(".node-input-iface").hide();
+ }
+ else {
+ $(".node-input-group").show();
+ $(".node-input-iface").show();
+ }
+ });
+ </script>
+</script>
+
+<script type="text/x-red" data-help-name="udp in">
+ <p>A udp input node, that produces a <b>msg.payload</b> containing a <i>BUFFER</i>, string, or base64 encoded string. Supports multicast.</p>
+ <p>It also provides <b>msg.ip</b> and <b>msg.port</b> to the ip address and port from which the message was received.</p>
+ <p>On some systems you may need to be root to use ports below 1024 and/or broadcast.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('udp in',{
+ category: 'input',
+ color:"Silver",
+ defaults: {
+ name: {value:""},
+ iface: {value:""},
+ port: {value:"",required:true,validate:RED.validators.number()},
+ datatype: {value:"buffer",required:true},
+ multicast: {value:"false"},
+ group: {value:"",validate:function(v) { return (this.multicast !== "true")||v.length > 0;} }
+ },
+ inputs:0,
+ outputs:1,
+ icon: "bridge-dash.png",
+ label: function() {
+ if (this.multicast=="false") {
+ return this.name||"udp "+this.port;
+ }
+ else return this.name||"udp "+(this.group+":"+this.port);
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ }
+ });
+</script>
+
+
+<!-- The Output Node -->
+<script type="text/x-red" data-template-name="udp out">
+ <div class="form-row">
+ <label for="node-input-port"><i class="fa fa-envelope"></i> Send a</label>
+ <select id="node-input-multicast" style='width:40%'>
+ <option value="false">udp message</option>
+ <option value="broad">broadcast message</option>
+ <option value="multi">multicast message</option>
+ </select>
+ to port <input type="text" id="node-input-port" placeholder="port" style="width: 70px">
+ </div>
+ <div class="form-row node-input-addr">
+ <label for="node-input-addr" id="node-input-addr-label"><i class="fa fa-list"></i> Address</label>
+ <input type="text" id="node-input-addr" placeholder="destination ip" style="width: 70%;">
+ </div>
+ <div class="form-row node-input-iface">
+ <label for="node-input-iface"><i class="fa fa-random"></i> Interface</label>
+ <input type="text" id="node-input-iface" placeholder="(optional) ip address of eth0">
+ </div>
+ <div class="form-row">
+ <label for="node-input-outport-type">&nbsp;</label>
+ <select id="node-input-outport-type">
+ <option id="node-input-outport-type-random" value="random">use random local port</option>
+ <option value="fixed">bind to local port</option>
+ </select>
+ <input type="text" id="node-input-outport" style="width: 70px;" placeholder="port">
+ </div>
+ <div class="form-row">
+ <label>&nbsp;</label>
+ <input type="checkbox" id="node-input-base64" style="display: inline-block; width: auto; vertical-align: top;">
+ <label for="node-input-base64" style="width: 70%;">Decode Base64 encoded payload ?</label>
+ </div>
+ <div class="form-row">
+ <label for="node-input-name"><i class="fa fa-tag"></i> Name</label>
+ <input type="text" id="node-input-name" placeholder="Name">
+ </div>
+ <div class="form-tips">Tip: leave address and port blank if you want to set using <b>msg.ip</b> and <b>msg.port</b>.</div>
+ <script>
+ $("#node-input-multicast").change(function() {
+ var id = $("#node-input-multicast option:selected").val();
+ if (id !== "multi") {
+ $(".node-input-iface").hide();
+ $("#node-input-addr-label").html('<i class="fa fa-list"></i> Address');
+ $("#node-input-addr")[0].placeholder = 'destination ip';
+ }
+ else {
+ $(".node-input-iface").show();
+ $("#node-input-addr-label").html('<i class="fa fa-list"></i> Group');
+ $("#node-input-addr")[0].placeholder = '225.0.18.83';
+ }
+ if (id === "broad") {
+ $("#node-input-addr")[0].placeholder = '255.255.255.255';
+ }
+ });
+ </script>
+</script>
+
+<script type="text/x-red" data-help-name="udp out">
+ <p>This node sends <b>msg.payload</b> to the designated udp host and port. Supports multicast.</p>
+ <p>You may also use <b>msg.ip</b> and <b>msg.port</b> to set the destination values.<br/><b>Note</b>: the statically configured values have precedence.</p>
+ <p>If you select broadcast either set the address to the local broadcast ip address, or maybe try 255.255.255.255, which is the global broadcast address.</p>
+ <p>On some systems you may need to be root to use ports below 1024 and/or broadcast.</p>
+</script>
+
+<script type="text/javascript">
+ RED.nodes.registerType('udp out',{
+ category: 'output',
+ color:"Silver",
+ defaults: {
+ name: {value:""},
+ addr: {value:""},
+ iface: {value:""},
+ port: {value:""},
+ outport: {value:""},
+ base64: {value:false,required:true},
+ multicast: {value:"false"}
+ },
+ inputs:1,
+ outputs:0,
+ icon: "bridge-dash.png",
+ align: "right",
+ label: function() {
+ return this.name||"udp "+(this.addr+":"+this.port);
+ },
+ labelStyle: function() {
+ return this.name?"node_label_italic":"";
+ },
+ oneditprepare: function() {
+ var type = this.outport==""?"random":"fixed";
+ $("#node-input-outport-type option").filter(function() {
+ return $(this).val() == type;
+ }).attr('selected',true);
+
+ $("#node-input-outport-type").change(function() {
+ var type = $(this).children("option:selected").val();
+ if (type == "random") {
+ $("#node-input-outport").val("").hide();
+ } else {
+ $("#node-input-outport").show();
+ }
+ });
+
+ $("#node-input-outport-type").change();
+
+ $("#node-input-multicast").change(function() {
+ var type = $(this).children("option:selected").val();
+ if (type == "false") {
+ $("#node-input-outport-type-random").html("bind to random local port");
+ } else {
+ $("#node-input-outport-type-random").html("bind to target port");
+ }
+ });
+ $("#node-input-multicast").change();
+ }
+ });
+</script>
diff --git a/dgbuilder/core_nodes/io/32-udp.js b/dgbuilder/core_nodes/io/32-udp.js
new file mode 100644
index 00000000..a7968e3a
--- /dev/null
+++ b/dgbuilder/core_nodes/io/32-udp.js
@@ -0,0 +1,171 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+module.exports = function(RED) {
+ "use strict";
+ var dgram = require('dgram');
+
+ // The Input Node
+ function UDPin(n) {
+ RED.nodes.createNode(this,n);
+ this.group = n.group;
+ this.port = n.port;
+ this.datatype = n.datatype;
+ this.iface = n.iface || null;
+ this.multicast = n.multicast;
+ var node = this;
+
+ var server = dgram.createSocket('udp4');
+
+ server.on("error", function (err) {
+ if ((err.code == "EACCES") && (node.port < 1024)) {
+ node.error("UDP access error, you may need root access for ports below 1024");
+ } else {
+ node.error("UDP error : "+err.code);
+ }
+ server.close();
+ });
+
+ server.on('message', function (message, remote) {
+ var msg;
+ if (node.datatype =="base64") {
+ msg = { payload:message.toString('base64'), fromip:remote.address+':'+remote.port };
+ } else if (node.datatype =="utf8") {
+ msg = { payload:message.toString('utf8'), fromip:remote.address+':'+remote.port };
+ } else {
+ msg = { payload:message, fromip:remote.address+':'+remote.port, ip:remote.address, port:remote.port };
+ }
+ node.send(msg);
+ });
+
+ server.on('listening', function () {
+ var address = server.address();
+ node.log('udp listener at ' + address.address + ":" + address.port);
+ if (node.multicast == "true") {
+ server.setBroadcast(true);
+ try {
+ server.setMulticastTTL(128);
+ server.addMembership(node.group,node.iface);
+ node.log("udp multicast group "+node.group);
+ } catch (e) {
+ if (e.errno == "EINVAL") {
+ node.error("Bad Multicast Address");
+ } else if (e.errno == "ENODEV") {
+ node.error("Must be ip address of the required interface");
+ } else {
+ node.error("Error :"+e.errno);
+ }
+ }
+ }
+ });
+
+ node.on("close", function() {
+ try {
+ server.close();
+ node.log('udp listener stopped');
+ } catch (err) {
+ node.error(err);
+ }
+ });
+
+ server.bind(node.port,node.iface);
+ }
+ RED.nodes.registerType("udp in",UDPin);
+
+
+ // The Output Node
+ function UDPout(n) {
+ RED.nodes.createNode(this,n);
+ //this.group = n.group;
+ this.port = n.port;
+ this.outport = n.outport||"";
+ this.base64 = n.base64;
+ this.addr = n.addr;
+ this.iface = n.iface || null;
+ this.multicast = n.multicast;
+ var node = this;
+
+ var sock = dgram.createSocket('udp4'); // only use ipv4 for now
+
+ if (node.multicast != "false") {
+ if (node.outport == "") { node.outport = node.port; }
+ sock.bind(node.outport, function() { // have to bind before you can enable broadcast...
+ sock.setBroadcast(true); // turn on broadcast
+ if (node.multicast == "multi") {
+ try {
+ sock.setMulticastTTL(128);
+ sock.addMembership(node.addr,node.iface); // Add to the multicast group
+ node.log('udp multicast ready : '+node.outport+' -> '+node.addr+":"+node.port);
+ } catch (e) {
+ if (e.errno == "EINVAL") {
+ node.error("Bad Multicast Address");
+ } else if (e.errno == "ENODEV") {
+ node.error("Must be ip address of the required interface");
+ } else {
+ node.error("Error :"+e.errno);
+ }
+ }
+ } else {
+ node.log('udp broadcast ready : '+node.outport+' -> '+node.addr+":"+node.port);
+ }
+ });
+ } else if (node.outport != "") {
+ sock.bind(node.outport);
+ node.log('udp ready : '+node.outport+' -> '+node.addr+":"+node.port);
+ } else {
+ node.log('udp ready : '+node.addr+":"+node.port);
+ }
+
+ node.on("input", function(msg) {
+ if (msg.payload != null) {
+ var add = node.addr || msg.ip || "";
+ var por = node.port || msg.port || 0;
+ if (add == "") {
+ node.warn("udp: ip address not set");
+ } else if (por == 0) {
+ node.warn("udp: port not set");
+ } else if (isNaN(por) || (por < 1) || (por > 65535)) {
+ node.warn("udp: port number not valid");
+ } else {
+ var message;
+ if (node.base64) {
+ message = new Buffer(msg.payload, 'base64');
+ } else if (msg.payload instanceof Buffer) {
+ message = msg.payload;
+ } else {
+ message = new Buffer(""+msg.payload);
+ }
+ sock.send(message, 0, message.length, por, add, function(err, bytes) {
+ if (err) {
+ node.error("udp : "+err);
+ }
+ message = null;
+ });
+ }
+ }
+ });
+
+ node.on("close", function() {
+ try {
+ sock.close();
+ node.log('udp output stopped');
+ } catch (err) {
+ node.error(err);
+ }
+ });
+ }
+ RED.nodes.registerType("udp out",UDPout);
+}
diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js
new file mode 100644
index 00000000..141a8889
--- /dev/null
+++ b/dgbuilder/core_nodes/io/lib/mqtt.js
@@ -0,0 +1,254 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+var util = require("util");
+var mqtt = require("mqtt");
+var events = require("events");
+//var inspect = require("sys").inspect;
+
+//var Client = module.exports.Client = function(
+
+var port = 1883;
+var host = "localhost";
+
+function MQTTClient(port,host) {
+ this.port = port||1883;
+ this.host = host||"localhost";
+ this.messageId = 1;
+ this.pendingSubscriptions = {};
+ this.inboundMessages = {};
+ this.lastOutbound = (new Date()).getTime();
+ this.lastInbound = (new Date()).getTime();
+ this.connected = false;
+
+ this._nextMessageId = function() {
+ this.messageId += 1;
+ if (this.messageId > 0xFFFF) {
+ this.messageId = 1;
+ }
+ return this.messageId;
+ }
+ events.EventEmitter.call(this);
+}
+util.inherits(MQTTClient, events.EventEmitter);
+
+MQTTClient.prototype.connect = function(options) {
+ if (!this.connected) {
+ var self = this;
+ options = options||{};
+ self.options = options;
+ self.options.keepalive = options.keepalive||15;
+ self.options.clean = self.options.clean||true;
+ self.options.protocolId = 'MQIsdp';
+ self.options.protocolVersion = 3;
+
+ self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
+ if (err) {
+ self.connected = false;
+ clearInterval(self.watchdog);
+ self.connectionError = true;
+ //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
+ self.emit('connectionlost',err);
+ return;
+ }
+ client.on('close',function(e) {
+ //util.log('[mqtt] ['+self.uid+'] on close');
+ clearInterval(self.watchdog);
+ if (!self.connectionError) {
+ if (self.connected) {
+ self.connected = false;
+ self.emit('connectionlost',e);
+ } else {
+ self.emit('disconnect');
+ }
+ }
+ });
+ client.on('error',function(e) {
+ //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
+ clearInterval(self.watchdog);
+ if (self.connected) {
+ self.connected = false;
+ self.emit('connectionlost',e);
+ }
+ });
+ client.on('connack',function(packet) {
+ if (packet.returnCode == 0) {
+ self.watchdog = setInterval(function(self) {
+ var now = (new Date()).getTime();
+
+ //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
+
+ if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
+ if (self.pingOutstanding) {
+ //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
+ try {
+ self.client.disconnect();
+ } catch (err) {
+ }
+ } else {
+ //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
+ self.lastOutbound = (new Date()).getTime();
+ self.lastInbound = (new Date()).getTime();
+ self.pingOutstanding = true;
+ self.client.pingreq();
+ }
+ }
+
+ },self.options.keepalive*500,self);
+ self.pingOutstanding = false;
+ self.lastInbound = (new Date()).getTime()
+ self.lastOutbound = (new Date()).getTime()
+ self.connected = true;
+ self.connectionError = false;
+ self.emit('connect');
+ } else {
+ self.connected = false;
+ self.emit('connectionlost');
+ }
+ });
+ client.on('suback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var topic = self.pendingSubscriptions[packet.messageId];
+ self.emit('subscribe',topic,packet.granted[0]);
+ delete self.pendingSubscriptions[packet.messageId];
+ });
+ client.on('unsuback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var topic = self.pendingSubscriptions[packet.messageId];
+ self.emit('unsubscribe',topic,packet.granted[0]);
+ delete self.pendingSubscriptions[packet.messageId];
+ });
+ client.on('publish',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ if (packet.qos < 2) {
+ var p = packet;
+ self.emit('message',p.topic,p.payload,p.qos,p.retain);
+ } else {
+ self.inboundMessages[packet.messageId] = packet;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.pubrec(packet);
+ }
+ if (packet.qos == 1) {
+ this.lastOutbound = (new Date()).getTime()
+ self.client.puback(packet);
+ }
+ });
+
+ client.on('pubrel',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ var p = self.inboundMessages[packet.messageId];
+ if (p) {
+ self.emit('message',p.topic,p.payload,p.qos,p.retain);
+ delete self.inboundMessages[packet.messageId];
+ }
+ self.lastOutbound = (new Date()).getTime()
+ self.client.pubcomp(packet);
+ });
+
+ client.on('puback',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ // outbound qos-1 complete
+ });
+
+ client.on('pubrec',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ self.lastOutbound = (new Date()).getTime()
+ self.client.pubrel(packet);
+ });
+ client.on('pubcomp',function(packet) {
+ self.lastInbound = (new Date()).getTime()
+ // outbound qos-2 complete
+ });
+ client.on('pingresp',function(packet) {
+ //util.log('[mqtt] ['+self.uid+'] received pingresp');
+ self.lastInbound = (new Date()).getTime()
+ self.pingOutstanding = false;
+ });
+
+ this.lastOutbound = (new Date()).getTime()
+ this.connectionError = false;
+ client.connect(self.options);
+ });
+ }
+}
+
+MQTTClient.prototype.subscribe = function(topic,qos) {
+ var self = this;
+ if (self.connected) {
+ var options = {
+ subscriptions:[{topic:topic,qos:qos}],
+ messageId: self._nextMessageId()
+ };
+ this.pendingSubscriptions[options.messageId] = topic;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.subscribe(options);
+ }
+}
+MQTTClient.prototype.unsubscribe = function(topic) {
+ var self = this;
+ if (self.connected) {
+ var options = {
+ topic:topic,
+ messageId: self._nextMessageId()
+ };
+ this.pendingSubscriptions[options.messageId] = topic;
+ this.lastOutbound = (new Date()).getTime()
+ self.client.unsubscribe(options);
+ }
+}
+
+MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
+ var self = this;
+ if (self.connected) {
+
+ if (!Buffer.isBuffer(payload)) {
+ if (typeof payload === "object") {
+ payload = JSON.stringify(payload);
+ } else if (typeof payload !== "string") {
+ payload = ""+payload;
+ }
+ }
+ var options = {
+ topic: topic,
+ payload: payload,
+ qos: qos||0,
+ retain:retain||false
+ };
+ if (options.qos != 0) {
+ options.messageId = self._nextMessageId();
+ }
+ this.lastOutbound = (new Date()).getTime()
+ self.client.publish(options);
+ }
+}
+
+MQTTClient.prototype.disconnect = function() {
+ var self = this;
+ if (this.connected) {
+ this.connected = false;
+ try {
+ this.client.disconnect();
+ } catch(err) {
+ }
+ }
+}
+MQTTClient.prototype.isConnected = function() {
+ return this.connected;
+}
+module.exports.createClient = function(port,host) {
+ var mqtt_client = new MQTTClient(port,host);
+ return mqtt_client;
+}
+
diff --git a/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
new file mode 100644
index 00000000..d15f0fc7
--- /dev/null
+++ b/dgbuilder/core_nodes/io/lib/mqttConnectionPool.js
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+var util = require("util");
+var mqtt = require("./mqtt");
+var settings = require(process.env.NODE_RED_HOME+"/red/red").settings;
+
+var connections = {};
+
+function matchTopic(ts,t) {
+ if (ts == "#") {
+ return true;
+ }
+ var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
+ return re.test(t);
+}
+
+module.exports = {
+ get: function(broker,port,clientid,username,password,will) {
+ var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port;
+ if (!connections[id]) {
+ connections[id] = function() {
+ var uid = (1+Math.random()*4294967295).toString(16);
+ var client = mqtt.createClient(port,broker);
+ client.uid = uid;
+ client.setMaxListeners(0);
+ var options = {keepalive:15};
+ options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
+ options.username = username;
+ options.password = password;
+ options.will = will;
+ var queue = [];
+ var subscriptions = [];
+ var connecting = false;
+ var obj = {
+ _instances: 0,
+ publish: function(msg) {
+ if (client.isConnected()) {
+ client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
+ } else {
+ if (!connecting) {
+ connecting = true;
+ client.connect(options);
+ }
+ queue.push(msg);
+ }
+ },
+ subscribe: function(topic,qos,callback) {
+ subscriptions.push({topic:topic,qos:qos,callback:callback});
+ client.on('message',function(mtopic,mpayload,mqos,mretain) {
+ if (matchTopic(topic,mtopic)) {
+ callback(mtopic,mpayload,mqos,mretain);
+ }
+ });
+ if (client.isConnected()) {
+ client.subscribe(topic,qos);
+ }
+ },
+ on: function(a,b){
+ client.on(a,b);
+ },
+ once: function(a,b){
+ client.once(a,b);
+ },
+ connect: function() {
+ if (client && !client.isConnected() && !connecting) {
+ connecting = true;
+ client.connect(options);
+ }
+ },
+ disconnect: function() {
+ this._instances -= 1;
+ if (this._instances == 0) {
+ client.disconnect();
+ client = null;
+ delete connections[id];
+ }
+ }
+ };
+ client.on('connect',function() {
+ if (client) {
+ util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
+ connecting = false;
+ for (var s in subscriptions) {
+ var topic = subscriptions[s].topic;
+ var qos = subscriptions[s].qos;
+ var callback = subscriptions[s].callback;
+ client.subscribe(topic,qos);
+ }
+ //console.log("connected - publishing",queue.length,"messages");
+ while(queue.length) {
+ var msg = queue.shift();
+ //console.log(msg);
+ client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
+ }
+ }
+ });
+ client.on('connectionlost', function(err) {
+ util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
+ connecting = false;
+ setTimeout(function() {
+ obj.connect();
+ }, settings.mqttReconnectTime||5000);
+ });
+ client.on('disconnect', function() {
+ connecting = false;
+ util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
+ });
+
+ return obj
+ }();
+ }
+ connections[id]._instances += 1;
+ return connections[id];
+ }
+};