aboutsummaryrefslogtreecommitdiffstats
path: root/lib/cloudify.js
diff options
context:
space:
mode:
authorAlex Shatov <alexs@att.com>2017-09-15 13:52:31 -0400
committerAlex Shatov <alexs@att.com>2017-09-15 13:52:31 -0400
commitb511ff360603599c43e066722d3902a55fac3f6d (patch)
tree7caa5c8ceb51568b444f0bd4ed9158fe92e8355e /lib/cloudify.js
parent3486bd1a77a8a3f198de171fda34c102406a26ea (diff)
4.2.0 - added the policy-update handling
* API version 4.1.0 - added /policy, /swagger-ui * new: policy update with queuing of execute-operations * more data on info - branch+commit-datetime * expecting consul ip to be in /etc/hosts * added swagger-ui and added policy to *API.yaml * common logging - more audits and metrics records - directly in promise_request Change-Id: I7d32f34100a16b5b7293ed5afe67f5c8c3098495 Issue-Id: DCAEGEN2-62 Signed-off-by: Alex Shatov <alexs@att.com>
Diffstat (limited to 'lib/cloudify.js')
-rw-r--r--lib/cloudify.js594
1 files changed, 370 insertions, 224 deletions
diff --git a/lib/cloudify.js b/lib/cloudify.js
index 150f1c4..303134a 100644
--- a/lib/cloudify.js
+++ b/lib/cloudify.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -18,199 +18,228 @@ See the License for the specific language governing permissions and limitations
"use strict";
-const admzip = require('adm-zip');
+const CLOUDIFY = "cloudify-manager";
+const FINISHED = [ "terminated", "cancelled", "failed" ];
+const RETRY_INTERVAL = 5000; // Every 5 seconds
+const MAX_TRIES = 720; // Up to 1 hour
+const doRequest = require('./promise_request').doRequest;
const repeat = require('./repeat');
-const req = require('./promise_request');
-const doRequest = req.doRequest;
+const admzip = require('adm-zip');
+const createError = require('./dispatcher-error').createDispatcherError;
var cfyAPI = null;
var cfyAuth = null;
var logger = null;
+// class to queue up the execute operations on deployments
+var ExeQueue = function ExeQueue(){
+ this.deployments = {};
+};
+ExeQueue.prototype.isDeploymentBusy = function(deployment_id) {return !!this.deployments[deployment_id];};
+ExeQueue.prototype.removeDeployment = function(deployment_id) {
+ if (!!this.deployments[deployment_id]) {
+ delete this.deployments[deployment_id];
+ }
+};
+ExeQueue.prototype.queueUpExecution = function(mainReq, deployment_id, workflow_id, parameters) {
+ this.deployments[deployment_id] = this.deployments[deployment_id] || {"deployment_id":deployment_id, "exe_queue": []};
+ this.deployments[deployment_id].exe_queue.push({"mainReq": mainReq, "workflow_id": workflow_id, "parameters": parameters});
+};
+ExeQueue.prototype.setExecutionId = function(deployment_id, execution_id) {
+ var depl = this.deployments[deployment_id];
+ if (!depl) {return;}
+ depl.execution_id = execution_id;
+};
+ExeQueue.prototype.nextExecution = function(deployment_id) {
+ var depl = this.deployments[deployment_id];
+ if (!depl) {return;}
+ if (depl.execution_id) {
+ delete depl.execution_id;
+ depl.exe_queue.shift();
+ if (!depl.exe_queue.length) {
+ delete this.deployments[deployment_id];
+ return;
+ }
+ }
+ return depl.exe_queue[0];
+};
+var exeQueue = new ExeQueue();
// Delay function--returns a promise that's resolved after 'dtime'
// milliseconds.`
var delay = function(dtime) {
- return new Promise(function(resolve, reject) {
- setTimeout(resolve, dtime);
- });
+ return new Promise(function(resolve, reject) {
+ setTimeout(resolve, dtime);
+ });
};
// Get current status of a workflow execution
-// Function for getting execution info
-const getExecutionStatus = function(executionId) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/executions/" + executionId
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- return doRequest(reqOptions);
+const getExecutionStatus = function(execution_id, mainReq) {
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/executions/" + execution_id
+ };
+ addAuthToOptions(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY, mainReq);
};
-// Poll for the result of a workflow execution
-var getWorkflowResult = function(execution_id) {
- var finished = [ "terminated", "cancelled", "failed" ];
- var retryInterval = 15000; // Every 15 seconds
- var maxTries = 240; // Up to an hour
-
- logger.debug(null, "Getting workflow result for execution id: " + execution_id);
-
- // Function for testing if workflow is finished
- // Expects the result of getExecStatus
- var checkStatus = function(res) {
- logger.debug(null, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && finished.indexOf(res.json.status) < 0));
- return res.json && res.json.status && finished.indexOf(res.json.status) < 0;
- };
-
- // Create execution status checker function
- var getExecStatus = function() { return getExecutionStatus(execution_id);};
-
- return repeat.repeatWhile(getExecStatus, checkStatus, maxTries, retryInterval)
- .then(
-
- /* Handle fulfilled promise from repeatWhile */
- function(res) {
-
- logger.debug(null, 'workflow result: ' + JSON.stringify(res));
-
- /* Successful completion */
- if (res.json && res.json.status && res.json.status === 'terminated') {
- return res;
- }
-
- /* If we get here, we don't have a success and we're going to throw something */
-
- var error = {};
-
- /* We expect a JSON object with a status */
- if (res.json && res.json.status) {
-
- /* Failure -- we need to return something that looks like the CM API failures */
- if (res.json.status === 'failed') {
- error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information');
- }
-
- /* Cancellation -- don't really expect this */
- else if (res.json.status === 'canceled' || res.json.status === 'cancelled') {
- error.body = 'workflow canceled: ' + execution_id;
- }
-
- /* Don't expect anything else -- but if we get it, it's not a success! */
- else {
- error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id;
- }
- }
-
+// Poll for the result of a workflow execution until it's done
+var getWorkflowResult = function(execution_id, mainReq) {
+ logger.debug(mainReq.dcaeReqId, "Getting workflow result for execution id: " + execution_id);
+
+ // Function for testing if workflow is finished
+ // Expects the result of getExecStatus
+ var checkStatus = function(res) {
+ logger.debug(mainReq.dcaeReqId, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0));
+ return res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0;
+ };
+
+ // Create execution status checker function
+ var getExecStatus = function() {return getExecutionStatus(execution_id, mainReq);};
+
+ return repeat.repeatWhile(getExecStatus, checkStatus, MAX_TRIES, RETRY_INTERVAL)
+ .then(
+
+ /* Handle fulfilled promise from repeatWhile */
+ function(res) {
+
+ logger.debug(mainReq.dcaeReqId, 'workflow result: ' + JSON.stringify(res));
+
+ /* Successful completion */
+ if (res.json && res.json.status && res.json.status === 'terminated') {
+ return res;
+ }
+
+ /* If we get here, we don't have a success and we're going to throw something */
+
+ var error = {};
+
+ /* We expect a JSON object with a status */
+ if (res.json && res.json.status) {
+
+ /* Failure -- we need to return something that looks like the CM API failures */
+ if (res.json.status === 'failed') {
+ error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information');
+ }
+
+ /* Cancellation -- don't really expect this */
+ else if (res.json.status === 'canceled' || res.json.status === 'cancelled') {
+ error.body = 'workflow canceled: ' + execution_id;
+ }
+
+ /* Don't expect anything else -- but if we get it, it's not a success! */
+ else {
+ error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id;
+ }
+ }
+
/* The body of the response from the API call to get execution status is not what we expect at all */
- else {
- error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id;
- }
-
- throw error;
- },
-
- /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */
- function(err) {
- /* repeatWhile could fail and we get here because:
- * -- repeatWhile explicitly rejects the promise because it has exhausted the retries
- * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API
- * -- repeatWhile propagates a rejected promise due to a bad HTTP response status
- * These should all get normalized in deploy.js--so we just rethrow the error.
- */
-
- throw err;
-
- });
+ else {
+ error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id;
+ }
+
+ throw error;
+ },
+
+ /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */
+ function(err) {
+ /* repeatWhile could fail and we get here because:
+ * -- repeatWhile explicitly rejects the promise because it has exhausted the retries
+ * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API
+ * -- repeatWhile propagates a rejected promise due to a bad HTTP response status
+ * These should all get normalized in deploy.js--so we just rethrow the error.
+ */
+
+ throw err;
+
+ });
+};
+
+// bare start of a workflow execution against a deployment
+const startWorkflowExecution = function(mainReq, deployment_id, workflow_id, parameters) {
+ // Set up the HTTP POST request
+ var reqOptions = {
+ method : "POST",
+ uri : cfyAPI + "/executions",
+ headers : {
+ "Content-Type" : "application/json",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+ var body = {
+ "deployment_id" : deployment_id,
+ "workflow_id" : workflow_id
+ };
+ if (parameters) {body.parameters = parameters;}
+
+ // Make the POST request
+ return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY, mainReq);
};
//Initiate a workflow execution against a deployment
-const initiateWorkflowExecution = function(dpid, workflow) {
- // Set up the HTTP POST request
- var reqOptions = {
- method : "POST",
- uri : cfyAPI + "/executions",
- headers : {
- "Content-Type" : "application/json",
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- var body = {
- deployment_id : dpid,
- workflow_id : workflow
- };
-
- // Make the POST request
- return doRequest(reqOptions, JSON.stringify(body))
- .then(function(result) {
- logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result));
- if (result.json && result.json.id) {
- return {deploymentId: dpid, workflowType: workflow, executionId: result.json.id};
- }
- else {
- logger.debug(null,"Did not get expected JSON body from POST to start workflow");
- var err = new Error("POST to start workflow got success response but no body");
- err.status = err.code = 502;
- }
- });
+const initiateWorkflowExecution = function(deployment_id, workflow_id, parameters) {
+ return startWorkflowExecution(null, deployment_id, workflow_id, parameters)
+ .then(function(result) {
+ logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result));
+ if (result.json && result.json.id) {
+ return {deploymentId: deployment_id, workflowType: workflow_id, executionId: result.json.id};
+ }
+ logger.debug(null,"Did not get expected JSON body from POST to start workflow");
+ var err = new Error("POST to start workflow got success response but no body");
+ err.status = err.code = 502;
+ throw err;
+ });
};
// Uploads a blueprint via the Cloudify API
exports.uploadBlueprint = function(bpid, blueprint) {
-
- // Cloudify API wants a gzipped tar of a directory, not the blueprint text
- var zip = new admzip();
- zip.addFile('work/', new Buffer(0));
- zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8'));
- var src = (zip.toBuffer());
-
- // Set up the HTTP PUT request
- var reqOptions = {
- method : "PUT",
- uri : cfyAPI + "/blueprints/" + bpid,
- headers : {
- "Content-Type" : "application/octet-stream",
- "Accept" : "*/*"
- }
- };
-
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- // Initiate PUT request and return the promise for a result
- return doRequest(reqOptions, src);
+
+ // Cloudify API wants a gzipped tar of a directory, not the blueprint text
+ var zip = new admzip();
+ zip.addFile('work/', new Buffer(0));
+ zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8'));
+ var src = (zip.toBuffer());
+
+ // Set up the HTTP PUT request
+ var reqOptions = {
+ method : "PUT",
+ uri : cfyAPI + "/blueprints/" + bpid,
+ headers : {
+ "Content-Type" : "application/octet-stream",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ // Initiate PUT request and return the promise for a result
+ return doRequest(reqOptions, src, CLOUDIFY);
};
// Creates a deployment from a blueprint
exports.createDeployment = function(dpid, bpid, inputs) {
- // Set up the HTTP PUT request
- var reqOptions = {
- method : "PUT",
- uri : cfyAPI + "/deployments/" + dpid,
- headers : {
- "Content-Type" : "application/json",
- "Accept" : "*/*"
- }
- };
-
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
+ // Set up the HTTP PUT request
+ var reqOptions = {
+ method : "PUT",
+ uri : cfyAPI + "/deployments/" + dpid,
+ headers : {
+ "Content-Type" : "application/json",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ var body = {
+ blueprint_id : bpid
+ };
+ if (inputs) {
+ body.inputs = inputs;
}
- var body = {
- blueprint_id : bpid
- };
- if (inputs) {
- body.inputs = inputs;
- }
-
- // Make the PUT request to create the deployment
- return doRequest(reqOptions, JSON.stringify(body));
+
+ // Make the PUT request to create the deployment
+ return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY);
};
// Initiate a workflow execution against a deployment
@@ -222,91 +251,208 @@ exports.getWorkflowExecutionStatus = getExecutionStatus;
// Return a promise for the final result of a workflow execution
exports.getWorkflowResult = getWorkflowResult;
-// Executes a workflow against a deployment and returns a promise for final result
-exports.executeWorkflow = function(dpid, workflow) {
-
- // Initiate the workflow
- return initiateWorkflowExecution(dpid, workflow)
-
- // Wait for the result
- .then (function(result) {
- logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result));
- return getWorkflowResult(result.executionId);
- });
+// Executes a workflow against a deployment and returns a promise for final result
+exports.executeWorkflow = function(deployment_id, workflow_id, parameters) {
+ return initiateWorkflowExecution(deployment_id, workflow_id, parameters)
+
+ // Wait for the result
+ .then (function(result) {
+ logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result));
+ return getWorkflowResult(result.executionId);
+ });
};
-// Wait for workflow to complete and get result
-exports.getWorkflowResult = getWorkflowResult;
// Retrieves outputs for a deployment
exports.getOutputs = function(dpid) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/deployments/" + dpid + "/outputs",
- headers : {
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
-
- return doRequest(reqOptions);
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/deployments/" + dpid + "/outputs",
+ headers : {
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Get the output descriptions for a deployment
exports.getOutputDescriptions = function(dpid) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/deployments/" + dpid + "?include=outputs",
- headers : {
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
-
- return doRequest(reqOptions);
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/deployments/" + dpid + "?include=outputs",
+ headers : {
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Deletes a deployment
exports.deleteDeployment = function(dpid) {
- var reqOptions = {
- method : "DELETE",
- uri : cfyAPI + "/deployments/" + dpid
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
+ var reqOptions = {
+ method : "DELETE",
+ uri : cfyAPI + "/deployments/" + dpid
+ };
+ addAuthToOptions(reqOptions);
- return doRequest(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Deletes a blueprint
exports.deleteBlueprint = function(bpid) {
- var reqOptions = {
- method : "DELETE",
- uri : cfyAPI + "/blueprints/" + bpid
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
+ var reqOptions = {
+ method : "DELETE",
+ uri : cfyAPI + "/blueprints/" + bpid
+ };
+ addAuthToOptions(reqOptions);
- return doRequest(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Allow client to set the Cloudify API root address
exports.setAPIAddress = function(addr) {
- cfyAPI = addr;
+ cfyAPI = cfyAPI || addr;
};
-// Allow client to set Cloudify credentials
+// Allow client to set Cloudify credentials
exports.setCredentials = function(user, password) {
- cfyAuth = user + ':' + password;
+ cfyAuth = cfyAuth || (user + ':' + password);
};
+function addAuthToOptions(reqOptions) {
+ if (!!cfyAuth && cfyAuth !== "undefined:undefined") {
+ reqOptions.auth = cfyAuth;
+ }
+}
+
// Set a logger
exports.setLogger = function(log) {
- logger = log;
+ logger = logger || log;
+};
+
+exports.getNodeInstances = function (mainReq, on_next_node_instances, offset) {
+ offset = offset || 0;
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/node-instances?_include=id,deployment_id,runtime_properties&_offset=" + offset
+ };
+ addAuthToOptions(reqOptions);
+
+ logger.debug(mainReq.dcaeReqId, "getNodeInstances: " + JSON.stringify(reqOptions));
+ return doRequest(reqOptions, null, CLOUDIFY, mainReq)
+ .then(function(cloudify_response) {
+ logger.debug(mainReq.dcaeReqId, "getNodeInstances response: " + JSON.stringify(cloudify_response));
+ var response = {};
+ cloudify_response = cloudify_response && cloudify_response.json;
+ if (!cloudify_response || !Array.isArray(cloudify_response.items)) {
+ response.status = 500;
+ response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response);
+ return response;
+ }
+ if (!cloudify_response.items.length) {
+ response.status = 200;
+ response.message = 'got no more node_instances';
+ return response;
+ }
+ logger.debug(mainReq.dcaeReqId, 'getNodeInstances got node_instances ' + cloudify_response.items.length);
+ if (typeof on_next_node_instances === 'function') {
+ on_next_node_instances(cloudify_response.items);
+ }
+ if (!cloudify_response.metadata || !cloudify_response.metadata.pagination) {
+ response.status = 500;
+ response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response);
+ return response;
+ }
+ offset += cloudify_response.items.length;
+ if (offset >= cloudify_response.metadata.pagination.total) {
+ response.status = 200;
+ response.message = 'got all node_instances ' + offset + "/" + cloudify_response.metadata.pagination.total;
+ return response;
+ }
+ return exports.getNodeInstances(mainReq, on_next_node_instances, offset);
+ })
+ .catch(function(error) {
+ return {
+ "status" : error.status || 500,
+ "message": "getNodeInstances cloudify error: " + JSON.stringify(error)
+ };
+ });
+};
+
+const runQueuedExecution = function(mainReq, deployment_id, workflow_id, parameters, waitedCount) {
+ mainReq = mainReq || {};
+ var execution_id;
+ var exe_deployment_str = " deployment_id " + deployment_id + " to " + workflow_id
+ + " with params(" + JSON.stringify(parameters || {}) + ")";
+ startWorkflowExecution(mainReq, deployment_id, workflow_id, parameters)
+ .then(function(result) {
+ logger.debug(mainReq.dcaeReqId, "result of start the execution for" + exe_deployment_str + ": " + JSON.stringify(result));
+ execution_id = result.json && result.json.id;
+ if (!execution_id) {
+ throw createError("failed to start execution - no execution_id for" + exe_deployment_str,
+ 553, "api", 553, CLOUDIFY);
+ }
+ exeQueue.setExecutionId(deployment_id, execution_id);
+ return getWorkflowResult(execution_id, mainReq);
+ })
+ .then(function(result) {
+ logger.debug(mainReq.dcaeReqId, 'successfully finished execution: ' + execution_id + " for" + exe_deployment_str);
+ var nextExecution = exeQueue.nextExecution(deployment_id);
+ if (nextExecution) {
+ logger.debug(nextExecution.mainReq.dcaeReqId, "next execution for deployment_id " + deployment_id
+ + " to " + nextExecution.workflow_id
+ + " with params(" + JSON.stringify(nextExecution.parameters || {}) + ")");
+ runQueuedExecution(nextExecution.mainReq, deployment_id, nextExecution.workflow_id, nextExecution.parameters);
+ }
+ })
+ .catch(function(result) {
+ if (result.status === 400 && result.json && result.json.error_code === "existing_running_execution_error") {
+ waitedCount = waitedCount || 0;
+ if (waitedCount >= MAX_TRIES) {
+ logger.error(createError("gave up on waiting for" + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ exeQueue.removeDeployment(deployment_id);
+ return;
+ }
+ ++waitedCount;
+ logger.warn(createError("runQueuedExecution sleeping " + waitedCount
+ + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ setTimeout(function() {runQueuedExecution(mainReq, deployment_id, workflow_id, parameters, waitedCount);}, RETRY_INTERVAL);
+ return;
+ }
+ exeQueue.removeDeployment(deployment_id);
+ if (result.status === 404 && result.json && result.json.error_code === "not_found_error") {
+ logger.error(createError("deployment not found for" + exe_deployment_str
+ + " cloudify response: " + JSON.stringify(result), 553, "api", 553, CLOUDIFY), mainReq);
+ return;
+ }
+ if (result instanceof Error) {
+ logger.error(result, mainReq);
+ return;
+ }
+ logger.error(createError("execute operation error " + (result.message || result.body || JSON.stringify(result))
+ + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ });
+};
+
+exports.executeOperation = function (mainReq, deployment_id, operation, operation_kwargs, node_instance_ids) {
+ const workflow_id = "execute_operation";
+ var parameters = {
+ 'operation': operation,
+ 'operation_kwargs': operation_kwargs,
+ 'node_instance_ids': node_instance_ids,
+ 'allow_kwargs_override': true
+ };
+
+ if (exeQueue.isDeploymentBusy(deployment_id)) {
+ exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters);
+ logger.debug(mainReq.dcaeReqId, "deployment busy - queue up execution for deployment_id " + deployment_id
+ + " to " + workflow_id + " with params(" + JSON.stringify(parameters || {}) + ")");
+ return;
+ }
+ exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters);
+ runQueuedExecution(mainReq, deployment_id, workflow_id, parameters);
};