diff options
Diffstat (limited to 'dgbuilder/red/nodes/flows.js')
-rw-r--r-- | dgbuilder/red/nodes/flows.js | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/dgbuilder/red/nodes/flows.js b/dgbuilder/red/nodes/flows.js new file mode 100644 index 00000000..b0b5d514 --- /dev/null +++ b/dgbuilder/red/nodes/flows.js @@ -0,0 +1,220 @@ +/** + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +var util = require("util"); +var when = require("when"); + +var typeRegistry = require("./registry"); +var credentials = require("./credentials"); +var log = require("../log"); +var events = require("../events"); + +var storage = null; + +var nodes = {}; +var activeConfig = []; +var missingTypes = []; + +events.on('type-registered',function(type) { + if (missingTypes.length > 0) { + var i = missingTypes.indexOf(type); + if (i != -1) { + missingTypes.splice(i,1); + util.log("[red] Missing type registered: "+type); + if (missingTypes.length === 0) { + parseConfig(); + } + } + } +}); + +/** + * Parses the current activeConfig and creates the required node instances + */ +function parseConfig() { + var i; + var nt; + missingTypes = []; + + // Scan the configuration for any unknown node types + for (i=0;i<activeConfig.length;i++) { + var type = activeConfig[i].type; + // TODO: remove workspace in next release+1 + if (type != "workspace" && type != "tab") { + nt = typeRegistry.get(type); + if (!nt && missingTypes.indexOf(type) == -1) { + missingTypes.push(type); + } + } + } + // Abort if there are any missing types + if (missingTypes.length > 0) { + util.log("[red] Waiting for missing types to be registered:"); + for (i=0;i<missingTypes.length;i++) { + util.log("[red] - "+missingTypes[i]); + } + return; + } + + util.log("[red] Starting flows"); + events.emit("nodes-starting"); + + // Instantiate each node in the flow + for (i=0;i<activeConfig.length;i++) { + var nn = null; + // TODO: remove workspace in next release+1 + if (activeConfig[i].type != "workspace" && activeConfig[i].type != "tab") { + nt = typeRegistry.get(activeConfig[i].type); + if (nt) { + try { + nn = new nt(activeConfig[i]); + } + catch (err) { + util.log("[red] "+activeConfig[i].type+" : "+err); + } + } + // console.log(nn); + if (nn === null) { + util.log("[red] unknown type: "+activeConfig[i].type); + } + } + } + // Clean up any orphaned credentials + credentials.clean(flowNodes.get); + events.emit("nodes-started"); +} + +/** + * Stops the current activeConfig + */ +function stopFlows() { + if (activeConfig&&activeConfig.length > 0) { + util.log("[red] Stopping flows"); + } + return flowNodes.clear(); +} + +var flowNodes = module.exports = { + init: function(_storage) { + storage = _storage; + }, + + /** + * Load the current activeConfig from storage and start it running + * @return a promise for the loading of the config + */ + load: function() { + return storage.getFlows().then(function(flows) { + return credentials.load().then(function() { + activeConfig = flows; + if (activeConfig && activeConfig.length > 0) { + parseConfig(); + } + }); + }).otherwise(function(err) { + util.log("[red] Error loading flows : "+err); + }); + }, + + /** + * Add a node to the current active set + * @param n the node to add + */ + add: function(n) { + nodes[n.id] = n; + n.on("log",log.log); + }, + + /** + * Get a node + * @param i the node id + * @return the node + */ + get: function(i) { + return nodes[i]; + }, + + /** + * Stops all active nodes and clears the active set + * @return a promise for the stopping of all active nodes + */ + clear: function() { + return when.promise(function(resolve) { + events.emit("nodes-stopping"); + var promises = []; + for (var n in nodes) { + if (nodes.hasOwnProperty(n)) { + try { + var p = nodes[n].close(); + if (p) { + promises.push(p); + } + } catch(err) { + nodes[n].error(err); + } + } + } + when.settle(promises).then(function() { + events.emit("nodes-stopped"); + nodes = {}; + resolve(); + }); + }); + }, + + /** + * Provides an iterator over the active set of nodes + * @param cb a function to be called for each node in the active set + */ + each: function(cb) { + for (var n in nodes) { + if (nodes.hasOwnProperty(n)) { + cb(nodes[n]); + } + } + }, + + /** + * @return the active configuration + */ + getFlows: function() { + return activeConfig; + }, + + /** + * Sets the current active config. + * @param config the configuration to enable + * @return a promise for the starting of the new flow + */ + setFlows: function (config) { + // Extract any credential updates + for (var i=0; i<config.length; i++) { + var node = config[i]; + if (node.credentials) { + credentials.extract(node); + delete node.credentials; + } + } + return credentials.save() + .then(function() { return storage.saveFlows(config);}) + .then(function() { return stopFlows();}) + .then(function () { + activeConfig = config; + parseConfig(); + }); + }, + stopFlows: stopFlows +}; |