diff options
Diffstat (limited to 'lib/cloudify.js')
-rw-r--r-- | lib/cloudify.js | 594 |
1 files changed, 370 insertions, 224 deletions
diff --git a/lib/cloudify.js b/lib/cloudify.js index 150f1c4..303134a 100644 --- a/lib/cloudify.js +++ b/lib/cloudify.js @@ -1,16 +1,16 @@ /* -Copyright(c) 2017 AT&T Intellectual Property. All rights reserved. +Copyright(c) 2017 AT&T Intellectual Property. All rights reserved. -Licensed under the Apache License, Version 2.0 (the "License"); +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -CONDITIONS OF ANY KIND, either express or implied. +CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ @@ -18,199 +18,228 @@ See the License for the specific language governing permissions and limitations "use strict"; -const admzip = require('adm-zip'); +const CLOUDIFY = "cloudify-manager"; +const FINISHED = [ "terminated", "cancelled", "failed" ]; +const RETRY_INTERVAL = 5000; // Every 5 seconds +const MAX_TRIES = 720; // Up to 1 hour +const doRequest = require('./promise_request').doRequest; const repeat = require('./repeat'); -const req = require('./promise_request'); -const doRequest = req.doRequest; +const admzip = require('adm-zip'); +const createError = require('./dispatcher-error').createDispatcherError; var cfyAPI = null; var cfyAuth = null; var logger = null; +// class to queue up the execute operations on deployments +var ExeQueue = function ExeQueue(){ + this.deployments = {}; +}; +ExeQueue.prototype.isDeploymentBusy = function(deployment_id) {return !!this.deployments[deployment_id];}; +ExeQueue.prototype.removeDeployment = function(deployment_id) { + if (!!this.deployments[deployment_id]) { + delete this.deployments[deployment_id]; + } +}; +ExeQueue.prototype.queueUpExecution = function(mainReq, deployment_id, workflow_id, parameters) { + this.deployments[deployment_id] = this.deployments[deployment_id] || {"deployment_id":deployment_id, "exe_queue": []}; + this.deployments[deployment_id].exe_queue.push({"mainReq": mainReq, "workflow_id": workflow_id, "parameters": parameters}); +}; +ExeQueue.prototype.setExecutionId = function(deployment_id, execution_id) { + var depl = this.deployments[deployment_id]; + if (!depl) {return;} + depl.execution_id = execution_id; +}; +ExeQueue.prototype.nextExecution = function(deployment_id) { + var depl = this.deployments[deployment_id]; + if (!depl) {return;} + if (depl.execution_id) { + delete depl.execution_id; + depl.exe_queue.shift(); + if (!depl.exe_queue.length) { + delete this.deployments[deployment_id]; + return; + } + } + return depl.exe_queue[0]; +}; +var exeQueue = new ExeQueue(); // Delay function--returns a promise that's resolved after 'dtime' // milliseconds.` var delay = function(dtime) { - return new Promise(function(resolve, reject) { - setTimeout(resolve, dtime); - }); + return new Promise(function(resolve, reject) { + setTimeout(resolve, dtime); + }); }; // Get current status of a workflow execution -// Function for getting execution info -const getExecutionStatus = function(executionId) { - var reqOptions = { - method : "GET", - uri : cfyAPI + "/executions/" + executionId - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } - return doRequest(reqOptions); +const getExecutionStatus = function(execution_id, mainReq) { + var reqOptions = { + method : "GET", + uri : cfyAPI + "/executions/" + execution_id + }; + addAuthToOptions(reqOptions); + return doRequest(reqOptions, null, CLOUDIFY, mainReq); }; -// Poll for the result of a workflow execution -var getWorkflowResult = function(execution_id) { - var finished = [ "terminated", "cancelled", "failed" ]; - var retryInterval = 15000; // Every 15 seconds - var maxTries = 240; // Up to an hour - - logger.debug(null, "Getting workflow result for execution id: " + execution_id); - - // Function for testing if workflow is finished - // Expects the result of getExecStatus - var checkStatus = function(res) { - logger.debug(null, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && finished.indexOf(res.json.status) < 0)); - return res.json && res.json.status && finished.indexOf(res.json.status) < 0; - }; - - // Create execution status checker function - var getExecStatus = function() { return getExecutionStatus(execution_id);}; - - return repeat.repeatWhile(getExecStatus, checkStatus, maxTries, retryInterval) - .then( - - /* Handle fulfilled promise from repeatWhile */ - function(res) { - - logger.debug(null, 'workflow result: ' + JSON.stringify(res)); - - /* Successful completion */ - if (res.json && res.json.status && res.json.status === 'terminated') { - return res; - } - - /* If we get here, we don't have a success and we're going to throw something */ - - var error = {}; - - /* We expect a JSON object with a status */ - if (res.json && res.json.status) { - - /* Failure -- we need to return something that looks like the CM API failures */ - if (res.json.status === 'failed') { - error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information'); - } - - /* Cancellation -- don't really expect this */ - else if (res.json.status === 'canceled' || res.json.status === 'cancelled') { - error.body = 'workflow canceled: ' + execution_id; - } - - /* Don't expect anything else -- but if we get it, it's not a success! */ - else { - error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id; - } - } - +// Poll for the result of a workflow execution until it's done +var getWorkflowResult = function(execution_id, mainReq) { + logger.debug(mainReq.dcaeReqId, "Getting workflow result for execution id: " + execution_id); + + // Function for testing if workflow is finished + // Expects the result of getExecStatus + var checkStatus = function(res) { + logger.debug(mainReq.dcaeReqId, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0)); + return res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0; + }; + + // Create execution status checker function + var getExecStatus = function() {return getExecutionStatus(execution_id, mainReq);}; + + return repeat.repeatWhile(getExecStatus, checkStatus, MAX_TRIES, RETRY_INTERVAL) + .then( + + /* Handle fulfilled promise from repeatWhile */ + function(res) { + + logger.debug(mainReq.dcaeReqId, 'workflow result: ' + JSON.stringify(res)); + + /* Successful completion */ + if (res.json && res.json.status && res.json.status === 'terminated') { + return res; + } + + /* If we get here, we don't have a success and we're going to throw something */ + + var error = {}; + + /* We expect a JSON object with a status */ + if (res.json && res.json.status) { + + /* Failure -- we need to return something that looks like the CM API failures */ + if (res.json.status === 'failed') { + error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information'); + } + + /* Cancellation -- don't really expect this */ + else if (res.json.status === 'canceled' || res.json.status === 'cancelled') { + error.body = 'workflow canceled: ' + execution_id; + } + + /* Don't expect anything else -- but if we get it, it's not a success! */ + else { + error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id; + } + } + /* The body of the response from the API call to get execution status is not what we expect at all */ - else { - error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id; - } - - throw error; - }, - - /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */ - function(err) { - /* repeatWhile could fail and we get here because: - * -- repeatWhile explicitly rejects the promise because it has exhausted the retries - * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API - * -- repeatWhile propagates a rejected promise due to a bad HTTP response status - * These should all get normalized in deploy.js--so we just rethrow the error. - */ - - throw err; - - }); + else { + error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id; + } + + throw error; + }, + + /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */ + function(err) { + /* repeatWhile could fail and we get here because: + * -- repeatWhile explicitly rejects the promise because it has exhausted the retries + * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API + * -- repeatWhile propagates a rejected promise due to a bad HTTP response status + * These should all get normalized in deploy.js--so we just rethrow the error. + */ + + throw err; + + }); +}; + +// bare start of a workflow execution against a deployment +const startWorkflowExecution = function(mainReq, deployment_id, workflow_id, parameters) { + // Set up the HTTP POST request + var reqOptions = { + method : "POST", + uri : cfyAPI + "/executions", + headers : { + "Content-Type" : "application/json", + "Accept" : "*/*" + } + }; + addAuthToOptions(reqOptions); + var body = { + "deployment_id" : deployment_id, + "workflow_id" : workflow_id + }; + if (parameters) {body.parameters = parameters;} + + // Make the POST request + return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY, mainReq); }; //Initiate a workflow execution against a deployment -const initiateWorkflowExecution = function(dpid, workflow) { - // Set up the HTTP POST request - var reqOptions = { - method : "POST", - uri : cfyAPI + "/executions", - headers : { - "Content-Type" : "application/json", - "Accept" : "*/*" - } - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } - var body = { - deployment_id : dpid, - workflow_id : workflow - }; - - // Make the POST request - return doRequest(reqOptions, JSON.stringify(body)) - .then(function(result) { - logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result)); - if (result.json && result.json.id) { - return {deploymentId: dpid, workflowType: workflow, executionId: result.json.id}; - } - else { - logger.debug(null,"Did not get expected JSON body from POST to start workflow"); - var err = new Error("POST to start workflow got success response but no body"); - err.status = err.code = 502; - } - }); +const initiateWorkflowExecution = function(deployment_id, workflow_id, parameters) { + return startWorkflowExecution(null, deployment_id, workflow_id, parameters) + .then(function(result) { + logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result)); + if (result.json && result.json.id) { + return {deploymentId: deployment_id, workflowType: workflow_id, executionId: result.json.id}; + } + logger.debug(null,"Did not get expected JSON body from POST to start workflow"); + var err = new Error("POST to start workflow got success response but no body"); + err.status = err.code = 502; + throw err; + }); }; // Uploads a blueprint via the Cloudify API exports.uploadBlueprint = function(bpid, blueprint) { - - // Cloudify API wants a gzipped tar of a directory, not the blueprint text - var zip = new admzip(); - zip.addFile('work/', new Buffer(0)); - zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8')); - var src = (zip.toBuffer()); - - // Set up the HTTP PUT request - var reqOptions = { - method : "PUT", - uri : cfyAPI + "/blueprints/" + bpid, - headers : { - "Content-Type" : "application/octet-stream", - "Accept" : "*/*" - } - }; - - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } - // Initiate PUT request and return the promise for a result - return doRequest(reqOptions, src); + + // Cloudify API wants a gzipped tar of a directory, not the blueprint text + var zip = new admzip(); + zip.addFile('work/', new Buffer(0)); + zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8')); + var src = (zip.toBuffer()); + + // Set up the HTTP PUT request + var reqOptions = { + method : "PUT", + uri : cfyAPI + "/blueprints/" + bpid, + headers : { + "Content-Type" : "application/octet-stream", + "Accept" : "*/*" + } + }; + addAuthToOptions(reqOptions); + + // Initiate PUT request and return the promise for a result + return doRequest(reqOptions, src, CLOUDIFY); }; // Creates a deployment from a blueprint exports.createDeployment = function(dpid, bpid, inputs) { - // Set up the HTTP PUT request - var reqOptions = { - method : "PUT", - uri : cfyAPI + "/deployments/" + dpid, - headers : { - "Content-Type" : "application/json", - "Accept" : "*/*" - } - }; - - if (cfyAuth) { - reqOptions.auth = cfyAuth; + // Set up the HTTP PUT request + var reqOptions = { + method : "PUT", + uri : cfyAPI + "/deployments/" + dpid, + headers : { + "Content-Type" : "application/json", + "Accept" : "*/*" + } + }; + addAuthToOptions(reqOptions); + + var body = { + blueprint_id : bpid + }; + if (inputs) { + body.inputs = inputs; } - var body = { - blueprint_id : bpid - }; - if (inputs) { - body.inputs = inputs; - } - - // Make the PUT request to create the deployment - return doRequest(reqOptions, JSON.stringify(body)); + + // Make the PUT request to create the deployment + return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY); }; // Initiate a workflow execution against a deployment @@ -222,91 +251,208 @@ exports.getWorkflowExecutionStatus = getExecutionStatus; // Return a promise for the final result of a workflow execution exports.getWorkflowResult = getWorkflowResult; -// Executes a workflow against a deployment and returns a promise for final result -exports.executeWorkflow = function(dpid, workflow) { - - // Initiate the workflow - return initiateWorkflowExecution(dpid, workflow) - - // Wait for the result - .then (function(result) { - logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result)); - return getWorkflowResult(result.executionId); - }); +// Executes a workflow against a deployment and returns a promise for final result +exports.executeWorkflow = function(deployment_id, workflow_id, parameters) { + return initiateWorkflowExecution(deployment_id, workflow_id, parameters) + + // Wait for the result + .then (function(result) { + logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result)); + return getWorkflowResult(result.executionId); + }); }; -// Wait for workflow to complete and get result -exports.getWorkflowResult = getWorkflowResult; // Retrieves outputs for a deployment exports.getOutputs = function(dpid) { - var reqOptions = { - method : "GET", - uri : cfyAPI + "/deployments/" + dpid + "/outputs", - headers : { - "Accept" : "*/*" - } - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } - - return doRequest(reqOptions); + var reqOptions = { + method : "GET", + uri : cfyAPI + "/deployments/" + dpid + "/outputs", + headers : { + "Accept" : "*/*" + } + }; + addAuthToOptions(reqOptions); + + return doRequest(reqOptions, null, CLOUDIFY); }; // Get the output descriptions for a deployment exports.getOutputDescriptions = function(dpid) { - var reqOptions = { - method : "GET", - uri : cfyAPI + "/deployments/" + dpid + "?include=outputs", - headers : { - "Accept" : "*/*" - } - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } - - return doRequest(reqOptions); + var reqOptions = { + method : "GET", + uri : cfyAPI + "/deployments/" + dpid + "?include=outputs", + headers : { + "Accept" : "*/*" + } + }; + addAuthToOptions(reqOptions); + + return doRequest(reqOptions, null, CLOUDIFY); }; // Deletes a deployment exports.deleteDeployment = function(dpid) { - var reqOptions = { - method : "DELETE", - uri : cfyAPI + "/deployments/" + dpid - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } + var reqOptions = { + method : "DELETE", + uri : cfyAPI + "/deployments/" + dpid + }; + addAuthToOptions(reqOptions); - return doRequest(reqOptions); + return doRequest(reqOptions, null, CLOUDIFY); }; // Deletes a blueprint exports.deleteBlueprint = function(bpid) { - var reqOptions = { - method : "DELETE", - uri : cfyAPI + "/blueprints/" + bpid - }; - if (cfyAuth) { - reqOptions.auth = cfyAuth; - } + var reqOptions = { + method : "DELETE", + uri : cfyAPI + "/blueprints/" + bpid + }; + addAuthToOptions(reqOptions); - return doRequest(reqOptions); + return doRequest(reqOptions, null, CLOUDIFY); }; // Allow client to set the Cloudify API root address exports.setAPIAddress = function(addr) { - cfyAPI = addr; + cfyAPI = cfyAPI || addr; }; -// Allow client to set Cloudify credentials +// Allow client to set Cloudify credentials exports.setCredentials = function(user, password) { - cfyAuth = user + ':' + password; + cfyAuth = cfyAuth || (user + ':' + password); }; +function addAuthToOptions(reqOptions) { + if (!!cfyAuth && cfyAuth !== "undefined:undefined") { + reqOptions.auth = cfyAuth; + } +} + // Set a logger exports.setLogger = function(log) { - logger = log; + logger = logger || log; +}; + +exports.getNodeInstances = function (mainReq, on_next_node_instances, offset) { + offset = offset || 0; + var reqOptions = { + method : "GET", + uri : cfyAPI + "/node-instances?_include=id,deployment_id,runtime_properties&_offset=" + offset + }; + addAuthToOptions(reqOptions); + + logger.debug(mainReq.dcaeReqId, "getNodeInstances: " + JSON.stringify(reqOptions)); + return doRequest(reqOptions, null, CLOUDIFY, mainReq) + .then(function(cloudify_response) { + logger.debug(mainReq.dcaeReqId, "getNodeInstances response: " + JSON.stringify(cloudify_response)); + var response = {}; + cloudify_response = cloudify_response && cloudify_response.json; + if (!cloudify_response || !Array.isArray(cloudify_response.items)) { + response.status = 500; + response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response); + return response; + } + if (!cloudify_response.items.length) { + response.status = 200; + response.message = 'got no more node_instances'; + return response; + } + logger.debug(mainReq.dcaeReqId, 'getNodeInstances got node_instances ' + cloudify_response.items.length); + if (typeof on_next_node_instances === 'function') { + on_next_node_instances(cloudify_response.items); + } + if (!cloudify_response.metadata || !cloudify_response.metadata.pagination) { + response.status = 500; + response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response); + return response; + } + offset += cloudify_response.items.length; + if (offset >= cloudify_response.metadata.pagination.total) { + response.status = 200; + response.message = 'got all node_instances ' + offset + "/" + cloudify_response.metadata.pagination.total; + return response; + } + return exports.getNodeInstances(mainReq, on_next_node_instances, offset); + }) + .catch(function(error) { + return { + "status" : error.status || 500, + "message": "getNodeInstances cloudify error: " + JSON.stringify(error) + }; + }); +}; + +const runQueuedExecution = function(mainReq, deployment_id, workflow_id, parameters, waitedCount) { + mainReq = mainReq || {}; + var execution_id; + var exe_deployment_str = " deployment_id " + deployment_id + " to " + workflow_id + + " with params(" + JSON.stringify(parameters || {}) + ")"; + startWorkflowExecution(mainReq, deployment_id, workflow_id, parameters) + .then(function(result) { + logger.debug(mainReq.dcaeReqId, "result of start the execution for" + exe_deployment_str + ": " + JSON.stringify(result)); + execution_id = result.json && result.json.id; + if (!execution_id) { + throw createError("failed to start execution - no execution_id for" + exe_deployment_str, + 553, "api", 553, CLOUDIFY); + } + exeQueue.setExecutionId(deployment_id, execution_id); + return getWorkflowResult(execution_id, mainReq); + }) + .then(function(result) { + logger.debug(mainReq.dcaeReqId, 'successfully finished execution: ' + execution_id + " for" + exe_deployment_str); + var nextExecution = exeQueue.nextExecution(deployment_id); + if (nextExecution) { + logger.debug(nextExecution.mainReq.dcaeReqId, "next execution for deployment_id " + deployment_id + + " to " + nextExecution.workflow_id + + " with params(" + JSON.stringify(nextExecution.parameters || {}) + ")"); + runQueuedExecution(nextExecution.mainReq, deployment_id, nextExecution.workflow_id, nextExecution.parameters); + } + }) + .catch(function(result) { + if (result.status === 400 && result.json && result.json.error_code === "existing_running_execution_error") { + waitedCount = waitedCount || 0; + if (waitedCount >= MAX_TRIES) { + logger.error(createError("gave up on waiting for" + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq); + exeQueue.removeDeployment(deployment_id); + return; + } + ++waitedCount; + logger.warn(createError("runQueuedExecution sleeping " + waitedCount + + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq); + setTimeout(function() {runQueuedExecution(mainReq, deployment_id, workflow_id, parameters, waitedCount);}, RETRY_INTERVAL); + return; + } + exeQueue.removeDeployment(deployment_id); + if (result.status === 404 && result.json && result.json.error_code === "not_found_error") { + logger.error(createError("deployment not found for" + exe_deployment_str + + " cloudify response: " + JSON.stringify(result), 553, "api", 553, CLOUDIFY), mainReq); + return; + } + if (result instanceof Error) { + logger.error(result, mainReq); + return; + } + logger.error(createError("execute operation error " + (result.message || result.body || JSON.stringify(result)) + + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq); + }); +}; + +exports.executeOperation = function (mainReq, deployment_id, operation, operation_kwargs, node_instance_ids) { + const workflow_id = "execute_operation"; + var parameters = { + 'operation': operation, + 'operation_kwargs': operation_kwargs, + 'node_instance_ids': node_instance_ids, + 'allow_kwargs_override': true + }; + + if (exeQueue.isDeploymentBusy(deployment_id)) { + exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters); + logger.debug(mainReq.dcaeReqId, "deployment busy - queue up execution for deployment_id " + deployment_id + + " to " + workflow_id + " with params(" + JSON.stringify(parameters || {}) + ")"); + return; + } + exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters); + runQueuedExecution(mainReq, deployment_id, workflow_id, parameters); }; |