aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/cloudify.js594
-rw-r--r--lib/config.js91
-rw-r--r--lib/consul.js88
-rw-r--r--lib/dcae-deployments.js2
-rw-r--r--lib/inventory.js314
-rw-r--r--lib/logging.js53
-rw-r--r--lib/policy.js181
-rw-r--r--lib/promise_request.js199
-rw-r--r--lib/repeat.js60
-rw-r--r--lib/swagger-ui.js31
10 files changed, 994 insertions, 619 deletions
diff --git a/lib/cloudify.js b/lib/cloudify.js
index 150f1c4..303134a 100644
--- a/lib/cloudify.js
+++ b/lib/cloudify.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -18,199 +18,228 @@ See the License for the specific language governing permissions and limitations
"use strict";
-const admzip = require('adm-zip');
+const CLOUDIFY = "cloudify-manager";
+const FINISHED = [ "terminated", "cancelled", "failed" ];
+const RETRY_INTERVAL = 5000; // Every 5 seconds
+const MAX_TRIES = 720; // Up to 1 hour
+const doRequest = require('./promise_request').doRequest;
const repeat = require('./repeat');
-const req = require('./promise_request');
-const doRequest = req.doRequest;
+const admzip = require('adm-zip');
+const createError = require('./dispatcher-error').createDispatcherError;
var cfyAPI = null;
var cfyAuth = null;
var logger = null;
+// class to queue up the execute operations on deployments
+var ExeQueue = function ExeQueue(){
+ this.deployments = {};
+};
+ExeQueue.prototype.isDeploymentBusy = function(deployment_id) {return !!this.deployments[deployment_id];};
+ExeQueue.prototype.removeDeployment = function(deployment_id) {
+ if (!!this.deployments[deployment_id]) {
+ delete this.deployments[deployment_id];
+ }
+};
+ExeQueue.prototype.queueUpExecution = function(mainReq, deployment_id, workflow_id, parameters) {
+ this.deployments[deployment_id] = this.deployments[deployment_id] || {"deployment_id":deployment_id, "exe_queue": []};
+ this.deployments[deployment_id].exe_queue.push({"mainReq": mainReq, "workflow_id": workflow_id, "parameters": parameters});
+};
+ExeQueue.prototype.setExecutionId = function(deployment_id, execution_id) {
+ var depl = this.deployments[deployment_id];
+ if (!depl) {return;}
+ depl.execution_id = execution_id;
+};
+ExeQueue.prototype.nextExecution = function(deployment_id) {
+ var depl = this.deployments[deployment_id];
+ if (!depl) {return;}
+ if (depl.execution_id) {
+ delete depl.execution_id;
+ depl.exe_queue.shift();
+ if (!depl.exe_queue.length) {
+ delete this.deployments[deployment_id];
+ return;
+ }
+ }
+ return depl.exe_queue[0];
+};
+var exeQueue = new ExeQueue();
// Delay function--returns a promise that's resolved after 'dtime'
// milliseconds.`
var delay = function(dtime) {
- return new Promise(function(resolve, reject) {
- setTimeout(resolve, dtime);
- });
+ return new Promise(function(resolve, reject) {
+ setTimeout(resolve, dtime);
+ });
};
// Get current status of a workflow execution
-// Function for getting execution info
-const getExecutionStatus = function(executionId) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/executions/" + executionId
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- return doRequest(reqOptions);
+const getExecutionStatus = function(execution_id, mainReq) {
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/executions/" + execution_id
+ };
+ addAuthToOptions(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY, mainReq);
};
-// Poll for the result of a workflow execution
-var getWorkflowResult = function(execution_id) {
- var finished = [ "terminated", "cancelled", "failed" ];
- var retryInterval = 15000; // Every 15 seconds
- var maxTries = 240; // Up to an hour
-
- logger.debug(null, "Getting workflow result for execution id: " + execution_id);
-
- // Function for testing if workflow is finished
- // Expects the result of getExecStatus
- var checkStatus = function(res) {
- logger.debug(null, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && finished.indexOf(res.json.status) < 0));
- return res.json && res.json.status && finished.indexOf(res.json.status) < 0;
- };
-
- // Create execution status checker function
- var getExecStatus = function() { return getExecutionStatus(execution_id);};
-
- return repeat.repeatWhile(getExecStatus, checkStatus, maxTries, retryInterval)
- .then(
-
- /* Handle fulfilled promise from repeatWhile */
- function(res) {
-
- logger.debug(null, 'workflow result: ' + JSON.stringify(res));
-
- /* Successful completion */
- if (res.json && res.json.status && res.json.status === 'terminated') {
- return res;
- }
-
- /* If we get here, we don't have a success and we're going to throw something */
-
- var error = {};
-
- /* We expect a JSON object with a status */
- if (res.json && res.json.status) {
-
- /* Failure -- we need to return something that looks like the CM API failures */
- if (res.json.status === 'failed') {
- error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information');
- }
-
- /* Cancellation -- don't really expect this */
- else if (res.json.status === 'canceled' || res.json.status === 'cancelled') {
- error.body = 'workflow canceled: ' + execution_id;
- }
-
- /* Don't expect anything else -- but if we get it, it's not a success! */
- else {
- error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id;
- }
- }
-
+// Poll for the result of a workflow execution until it's done
+var getWorkflowResult = function(execution_id, mainReq) {
+ logger.debug(mainReq.dcaeReqId, "Getting workflow result for execution id: " + execution_id);
+
+ // Function for testing if workflow is finished
+ // Expects the result of getExecStatus
+ var checkStatus = function(res) {
+ logger.debug(mainReq.dcaeReqId, "Checking result: " + JSON.stringify(res) + " ==> " + (res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0));
+ return res.json && res.json.status && FINISHED.indexOf(res.json.status) < 0;
+ };
+
+ // Create execution status checker function
+ var getExecStatus = function() {return getExecutionStatus(execution_id, mainReq);};
+
+ return repeat.repeatWhile(getExecStatus, checkStatus, MAX_TRIES, RETRY_INTERVAL)
+ .then(
+
+ /* Handle fulfilled promise from repeatWhile */
+ function(res) {
+
+ logger.debug(mainReq.dcaeReqId, 'workflow result: ' + JSON.stringify(res));
+
+ /* Successful completion */
+ if (res.json && res.json.status && res.json.status === 'terminated') {
+ return res;
+ }
+
+ /* If we get here, we don't have a success and we're going to throw something */
+
+ var error = {};
+
+ /* We expect a JSON object with a status */
+ if (res.json && res.json.status) {
+
+ /* Failure -- we need to return something that looks like the CM API failures */
+ if (res.json.status === 'failed') {
+ error.body = 'workflow failed: ' + execution_id + ' -- ' + (res.json.error ? JSON.stringify(res.json.error) : 'no error information');
+ }
+
+ /* Cancellation -- don't really expect this */
+ else if (res.json.status === 'canceled' || res.json.status === 'cancelled') {
+ error.body = 'workflow canceled: ' + execution_id;
+ }
+
+ /* Don't expect anything else -- but if we get it, it's not a success! */
+ else {
+ error.body = 'workflow--unexpected status ' + res.json.status + ' for ' + execution_id;
+ }
+ }
+
/* The body of the response from the API call to get execution status is not what we expect at all */
- else {
- error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id;
- }
-
- throw error;
- },
-
- /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */
- function(err) {
- /* repeatWhile could fail and we get here because:
- * -- repeatWhile explicitly rejects the promise because it has exhausted the retries
- * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API
- * -- repeatWhile propagates a rejected promise due to a bad HTTP response status
- * These should all get normalized in deploy.js--so we just rethrow the error.
- */
-
- throw err;
-
- });
+ else {
+ error.body = 'workflow--unexpected result body getting execution status from CM for ' + execution_id;
+ }
+
+ throw error;
+ },
+
+ /* Handle rejection of promise from repeatWhile--don't use a catch because it would catch the error thrown above */
+ function(err) {
+ /* repeatWhile could fail and we get here because:
+ * -- repeatWhile explicitly rejects the promise because it has exhausted the retries
+ * -- repeatWhile propagates a system error (e.g., network problem) trying to access the API
+ * -- repeatWhile propagates a rejected promise due to a bad HTTP response status
+ * These should all get normalized in deploy.js--so we just rethrow the error.
+ */
+
+ throw err;
+
+ });
+};
+
+// bare start of a workflow execution against a deployment
+const startWorkflowExecution = function(mainReq, deployment_id, workflow_id, parameters) {
+ // Set up the HTTP POST request
+ var reqOptions = {
+ method : "POST",
+ uri : cfyAPI + "/executions",
+ headers : {
+ "Content-Type" : "application/json",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+ var body = {
+ "deployment_id" : deployment_id,
+ "workflow_id" : workflow_id
+ };
+ if (parameters) {body.parameters = parameters;}
+
+ // Make the POST request
+ return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY, mainReq);
};
//Initiate a workflow execution against a deployment
-const initiateWorkflowExecution = function(dpid, workflow) {
- // Set up the HTTP POST request
- var reqOptions = {
- method : "POST",
- uri : cfyAPI + "/executions",
- headers : {
- "Content-Type" : "application/json",
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- var body = {
- deployment_id : dpid,
- workflow_id : workflow
- };
-
- // Make the POST request
- return doRequest(reqOptions, JSON.stringify(body))
- .then(function(result) {
- logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result));
- if (result.json && result.json.id) {
- return {deploymentId: dpid, workflowType: workflow, executionId: result.json.id};
- }
- else {
- logger.debug(null,"Did not get expected JSON body from POST to start workflow");
- var err = new Error("POST to start workflow got success response but no body");
- err.status = err.code = 502;
- }
- });
+const initiateWorkflowExecution = function(deployment_id, workflow_id, parameters) {
+ return startWorkflowExecution(null, deployment_id, workflow_id, parameters)
+ .then(function(result) {
+ logger.debug(null, "Result from POSTing workflow execution start: " + JSON.stringify(result));
+ if (result.json && result.json.id) {
+ return {deploymentId: deployment_id, workflowType: workflow_id, executionId: result.json.id};
+ }
+ logger.debug(null,"Did not get expected JSON body from POST to start workflow");
+ var err = new Error("POST to start workflow got success response but no body");
+ err.status = err.code = 502;
+ throw err;
+ });
};
// Uploads a blueprint via the Cloudify API
exports.uploadBlueprint = function(bpid, blueprint) {
-
- // Cloudify API wants a gzipped tar of a directory, not the blueprint text
- var zip = new admzip();
- zip.addFile('work/', new Buffer(0));
- zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8'));
- var src = (zip.toBuffer());
-
- // Set up the HTTP PUT request
- var reqOptions = {
- method : "PUT",
- uri : cfyAPI + "/blueprints/" + bpid,
- headers : {
- "Content-Type" : "application/octet-stream",
- "Accept" : "*/*"
- }
- };
-
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
- // Initiate PUT request and return the promise for a result
- return doRequest(reqOptions, src);
+
+ // Cloudify API wants a gzipped tar of a directory, not the blueprint text
+ var zip = new admzip();
+ zip.addFile('work/', new Buffer(0));
+ zip.addFile('work/blueprint.yaml', new Buffer(blueprint, 'utf8'));
+ var src = (zip.toBuffer());
+
+ // Set up the HTTP PUT request
+ var reqOptions = {
+ method : "PUT",
+ uri : cfyAPI + "/blueprints/" + bpid,
+ headers : {
+ "Content-Type" : "application/octet-stream",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ // Initiate PUT request and return the promise for a result
+ return doRequest(reqOptions, src, CLOUDIFY);
};
// Creates a deployment from a blueprint
exports.createDeployment = function(dpid, bpid, inputs) {
- // Set up the HTTP PUT request
- var reqOptions = {
- method : "PUT",
- uri : cfyAPI + "/deployments/" + dpid,
- headers : {
- "Content-Type" : "application/json",
- "Accept" : "*/*"
- }
- };
-
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
+ // Set up the HTTP PUT request
+ var reqOptions = {
+ method : "PUT",
+ uri : cfyAPI + "/deployments/" + dpid,
+ headers : {
+ "Content-Type" : "application/json",
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ var body = {
+ blueprint_id : bpid
+ };
+ if (inputs) {
+ body.inputs = inputs;
}
- var body = {
- blueprint_id : bpid
- };
- if (inputs) {
- body.inputs = inputs;
- }
-
- // Make the PUT request to create the deployment
- return doRequest(reqOptions, JSON.stringify(body));
+
+ // Make the PUT request to create the deployment
+ return doRequest(reqOptions, JSON.stringify(body), CLOUDIFY);
};
// Initiate a workflow execution against a deployment
@@ -222,91 +251,208 @@ exports.getWorkflowExecutionStatus = getExecutionStatus;
// Return a promise for the final result of a workflow execution
exports.getWorkflowResult = getWorkflowResult;
-// Executes a workflow against a deployment and returns a promise for final result
-exports.executeWorkflow = function(dpid, workflow) {
-
- // Initiate the workflow
- return initiateWorkflowExecution(dpid, workflow)
-
- // Wait for the result
- .then (function(result) {
- logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result));
- return getWorkflowResult(result.executionId);
- });
+// Executes a workflow against a deployment and returns a promise for final result
+exports.executeWorkflow = function(deployment_id, workflow_id, parameters) {
+ return initiateWorkflowExecution(deployment_id, workflow_id, parameters)
+
+ // Wait for the result
+ .then (function(result) {
+ logger.debug(null, "Result from initiating workflow: " + JSON.stringify(result));
+ return getWorkflowResult(result.executionId);
+ });
};
-// Wait for workflow to complete and get result
-exports.getWorkflowResult = getWorkflowResult;
// Retrieves outputs for a deployment
exports.getOutputs = function(dpid) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/deployments/" + dpid + "/outputs",
- headers : {
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
-
- return doRequest(reqOptions);
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/deployments/" + dpid + "/outputs",
+ headers : {
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Get the output descriptions for a deployment
exports.getOutputDescriptions = function(dpid) {
- var reqOptions = {
- method : "GET",
- uri : cfyAPI + "/deployments/" + dpid + "?include=outputs",
- headers : {
- "Accept" : "*/*"
- }
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
-
- return doRequest(reqOptions);
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/deployments/" + dpid + "?include=outputs",
+ headers : {
+ "Accept" : "*/*"
+ }
+ };
+ addAuthToOptions(reqOptions);
+
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Deletes a deployment
exports.deleteDeployment = function(dpid) {
- var reqOptions = {
- method : "DELETE",
- uri : cfyAPI + "/deployments/" + dpid
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
+ var reqOptions = {
+ method : "DELETE",
+ uri : cfyAPI + "/deployments/" + dpid
+ };
+ addAuthToOptions(reqOptions);
- return doRequest(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Deletes a blueprint
exports.deleteBlueprint = function(bpid) {
- var reqOptions = {
- method : "DELETE",
- uri : cfyAPI + "/blueprints/" + bpid
- };
- if (cfyAuth) {
- reqOptions.auth = cfyAuth;
- }
+ var reqOptions = {
+ method : "DELETE",
+ uri : cfyAPI + "/blueprints/" + bpid
+ };
+ addAuthToOptions(reqOptions);
- return doRequest(reqOptions);
+ return doRequest(reqOptions, null, CLOUDIFY);
};
// Allow client to set the Cloudify API root address
exports.setAPIAddress = function(addr) {
- cfyAPI = addr;
+ cfyAPI = cfyAPI || addr;
};
-// Allow client to set Cloudify credentials
+// Allow client to set Cloudify credentials
exports.setCredentials = function(user, password) {
- cfyAuth = user + ':' + password;
+ cfyAuth = cfyAuth || (user + ':' + password);
};
+function addAuthToOptions(reqOptions) {
+ if (!!cfyAuth && cfyAuth !== "undefined:undefined") {
+ reqOptions.auth = cfyAuth;
+ }
+}
+
// Set a logger
exports.setLogger = function(log) {
- logger = log;
+ logger = logger || log;
+};
+
+exports.getNodeInstances = function (mainReq, on_next_node_instances, offset) {
+ offset = offset || 0;
+ var reqOptions = {
+ method : "GET",
+ uri : cfyAPI + "/node-instances?_include=id,deployment_id,runtime_properties&_offset=" + offset
+ };
+ addAuthToOptions(reqOptions);
+
+ logger.debug(mainReq.dcaeReqId, "getNodeInstances: " + JSON.stringify(reqOptions));
+ return doRequest(reqOptions, null, CLOUDIFY, mainReq)
+ .then(function(cloudify_response) {
+ logger.debug(mainReq.dcaeReqId, "getNodeInstances response: " + JSON.stringify(cloudify_response));
+ var response = {};
+ cloudify_response = cloudify_response && cloudify_response.json;
+ if (!cloudify_response || !Array.isArray(cloudify_response.items)) {
+ response.status = 500;
+ response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response);
+ return response;
+ }
+ if (!cloudify_response.items.length) {
+ response.status = 200;
+ response.message = 'got no more node_instances';
+ return response;
+ }
+ logger.debug(mainReq.dcaeReqId, 'getNodeInstances got node_instances ' + cloudify_response.items.length);
+ if (typeof on_next_node_instances === 'function') {
+ on_next_node_instances(cloudify_response.items);
+ }
+ if (!cloudify_response.metadata || !cloudify_response.metadata.pagination) {
+ response.status = 500;
+ response.message = 'unexpected response from cloudify ' + JSON.stringify(cloudify_response);
+ return response;
+ }
+ offset += cloudify_response.items.length;
+ if (offset >= cloudify_response.metadata.pagination.total) {
+ response.status = 200;
+ response.message = 'got all node_instances ' + offset + "/" + cloudify_response.metadata.pagination.total;
+ return response;
+ }
+ return exports.getNodeInstances(mainReq, on_next_node_instances, offset);
+ })
+ .catch(function(error) {
+ return {
+ "status" : error.status || 500,
+ "message": "getNodeInstances cloudify error: " + JSON.stringify(error)
+ };
+ });
+};
+
+const runQueuedExecution = function(mainReq, deployment_id, workflow_id, parameters, waitedCount) {
+ mainReq = mainReq || {};
+ var execution_id;
+ var exe_deployment_str = " deployment_id " + deployment_id + " to " + workflow_id
+ + " with params(" + JSON.stringify(parameters || {}) + ")";
+ startWorkflowExecution(mainReq, deployment_id, workflow_id, parameters)
+ .then(function(result) {
+ logger.debug(mainReq.dcaeReqId, "result of start the execution for" + exe_deployment_str + ": " + JSON.stringify(result));
+ execution_id = result.json && result.json.id;
+ if (!execution_id) {
+ throw createError("failed to start execution - no execution_id for" + exe_deployment_str,
+ 553, "api", 553, CLOUDIFY);
+ }
+ exeQueue.setExecutionId(deployment_id, execution_id);
+ return getWorkflowResult(execution_id, mainReq);
+ })
+ .then(function(result) {
+ logger.debug(mainReq.dcaeReqId, 'successfully finished execution: ' + execution_id + " for" + exe_deployment_str);
+ var nextExecution = exeQueue.nextExecution(deployment_id);
+ if (nextExecution) {
+ logger.debug(nextExecution.mainReq.dcaeReqId, "next execution for deployment_id " + deployment_id
+ + " to " + nextExecution.workflow_id
+ + " with params(" + JSON.stringify(nextExecution.parameters || {}) + ")");
+ runQueuedExecution(nextExecution.mainReq, deployment_id, nextExecution.workflow_id, nextExecution.parameters);
+ }
+ })
+ .catch(function(result) {
+ if (result.status === 400 && result.json && result.json.error_code === "existing_running_execution_error") {
+ waitedCount = waitedCount || 0;
+ if (waitedCount >= MAX_TRIES) {
+ logger.error(createError("gave up on waiting for" + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ exeQueue.removeDeployment(deployment_id);
+ return;
+ }
+ ++waitedCount;
+ logger.warn(createError("runQueuedExecution sleeping " + waitedCount
+ + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ setTimeout(function() {runQueuedExecution(mainReq, deployment_id, workflow_id, parameters, waitedCount);}, RETRY_INTERVAL);
+ return;
+ }
+ exeQueue.removeDeployment(deployment_id);
+ if (result.status === 404 && result.json && result.json.error_code === "not_found_error") {
+ logger.error(createError("deployment not found for" + exe_deployment_str
+ + " cloudify response: " + JSON.stringify(result), 553, "api", 553, CLOUDIFY), mainReq);
+ return;
+ }
+ if (result instanceof Error) {
+ logger.error(result, mainReq);
+ return;
+ }
+ logger.error(createError("execute operation error " + (result.message || result.body || JSON.stringify(result))
+ + " on " + exe_deployment_str, 553, "api", 553, CLOUDIFY), mainReq);
+ });
+};
+
+exports.executeOperation = function (mainReq, deployment_id, operation, operation_kwargs, node_instance_ids) {
+ const workflow_id = "execute_operation";
+ var parameters = {
+ 'operation': operation,
+ 'operation_kwargs': operation_kwargs,
+ 'node_instance_ids': node_instance_ids,
+ 'allow_kwargs_override': true
+ };
+
+ if (exeQueue.isDeploymentBusy(deployment_id)) {
+ exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters);
+ logger.debug(mainReq.dcaeReqId, "deployment busy - queue up execution for deployment_id " + deployment_id
+ + " to " + workflow_id + " with params(" + JSON.stringify(parameters || {}) + ")");
+ return;
+ }
+ exeQueue.queueUpExecution(mainReq, deployment_id, workflow_id, parameters);
+ runQueuedExecution(mainReq, deployment_id, workflow_id, parameters);
};
diff --git a/lib/config.js b/lib/config.js
index 4429247..e44e9b5 100644
--- a/lib/config.js
+++ b/lib/config.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -18,11 +18,11 @@ See the License for the specific language governing permissions and limitations
* Dispatcher configuration
* Configuration may come from environment variables, a value in a Consul key-value store, or defaults,
* in that order of precedence.
- *
+ *
* The address of the Consul host is passed in an environment variable called CONSUL_HOST.
* If present, the configuration value in the key-value store is a UTF-8 serialization of a JSON object.
- *
- *
+ *
+ *
* --------------------------------------------------------------------------------------
* | JSON property | Environment variable | Required? | Default |
* --------------------------------------------------------------------------------------
@@ -51,7 +51,7 @@ See the License for the specific language governing permissions and limitations
* {"admin" : "admin123", "other" : "other123"}, then any incoming HTTP requests must use
* Basic authentication and supply "admin" as a user name with "admin123" as the password or
* supply "other" as the user name with "other123" as the password.
- *
+ *
* The dispatcher will attempt to run using TLS (i.e., as an HTTPS server) if a certificate
* file in pkcs12 format is stored at etc/cert/cert and a file containing the corresponding
* passphrase is stored at etc/cert/pass. These files can be made available to the container
@@ -65,6 +65,7 @@ const consul = require("./consul");
const SSL_CERT_FILE = "etc/cert/cert";
const SSL_PASS_FILE = "etc/cert/pass";
+const PACKAGE_JSON_FILE = "./package.json";
const CONFIG_KEY = "deployment_handler"; /* Configuration is stored under the name "deployment_handler" */
const CM_NAME = "cloudify_manager";
@@ -82,25 +83,23 @@ const DEFAULT_LOG_LEVEL = "INFO";
/* Check configuration for completeness */
const findMissingConfig = function(cfg) {
const requiredProps = ['logLevel', 'listenHost', 'listenPort', 'cloudify.url', 'inventory.url'];
- return requiredProps.filter(function(p){return !utils.hasProperty(cfg,p);});
+ return requiredProps.filter(function(p){return !utils.hasProperty(cfg,p);});
};
/* Fetch configuration */
-const getConfig = function (configStoreAddress) {
- const ch = consul({url: configStoreAddress});
- return ch.getKey(CONFIG_KEY)
+const getConfig = function() {
+ return consul.getKey(CONFIG_KEY)
.then(function(res) {
return res || {};
})
.catch(function(err) {
- throw err;
+ throw err;
});
};
/* Get a service host:port */
-const getService = function (configStoreAddress, serviceName) {
- const ch = consul({url: configStoreAddress});
- return ch.getService(serviceName)
+const getService = function (serviceName) {
+ return consul.getService(serviceName)
.then(function(res) {
if (res.length > 0) {
return res[0];
@@ -128,86 +127,92 @@ const getFileContents = function(path) {
/* Check for a TLS cert file and passphrase */
const getTLSCredentials = function() {
var ssl = {};
-
+
/* Get the passphrase */
return getFileContents(SSL_PASS_FILE)
.then(function(phrase) {
ssl.passphrase = phrase.toString('utf8').trim();
-
+
/* Get the cert */
return getFileContents(SSL_CERT_FILE);
})
-
+
.then(function(cert) {
- ssl.pfx = cert; /* Keep cert contents as a Buffer */
+ ssl.pfx = cert; /* Keep cert contents as a Buffer */
return ssl;
})
-
+
.catch(function(err) {
- return {};
+ return {};
});
}
-
-exports.configure = function(configStoreAddress) {
+exports.configure = function() {
var config = {};
-
- /* Construct a URL for Consul, assuming HTTP and the default Consul port */
- const configStoreURL = 'http://' + configStoreAddress + ':8500';
-
+
/* Get configuration from configuration store */
- return getConfig(configStoreURL)
+ return getFileContents(PACKAGE_JSON_FILE)
+ .then(function(package_json) {
+ package_json = JSON.parse((package_json || "{}").toString('utf8'));
+
+ config.name = package_json.name;
+ config.description = package_json.description;
+ config.version = package_json.version || "";
+ const ver = require('../version');
+ config.branch = ver.branch || "";
+ config.commit = ver.commit || "";
+ config.commit_datetime = ver.commit_datetime || "";
+
+ return getConfig();
+ })
.then (function(cfg) {
+ Object.assign(config, cfg);
- config = cfg;
-
/* Override values with environment variables and set defaults as needed */
config.listenPort = process.env.LISTEN_PORT || cfg.listenPort || DEFAULT_LISTEN_PORT;
config.listenHost = process.env.LISTEN_HOST || cfg.listenHost || DEFAULT_LISTEN_HOST;
config.logLevel = process.env.LOG_LEVEL || cfg.logLevel || DEFAULT_LOG_LEVEL;
-
+
config.cloudify = config.cloudify || {};
config.cloudify.protocol = process.env.CLOUDIFY_PROTOCOL || (cfg.cloudify && cfg.cloudify.protocol) || DEFAULT_CLOUDIFY_PROTOCOL;
if ((cfg.cloudify && cfg.cloudify.user) || process.env.CLOUDIFY_USER) {
config.cloudify.user = process.env.CLOUDIFY_USER || cfg.cloudify.user;
config.cloudify.password = process.env.CLOUDIFY_PASSWORD || cfg.cloudify.password || "";
}
-
+
config.inventory = config.inventory || {};
config.inventory.protocol = process.env.INVENTORY_PROTOCOL || (cfg.inventory && cfg.inventory.protocol) || DEFAULT_INVENTORY_PROTOCOL;
- if ((cfg.inventory && cfg.inventory.user)|| process.env.INVENTORY_USER) {
+ if ((cfg.inventory && cfg.inventory.user)|| process.env.INVENTORY_USER) {
config.inventory.user = process.env.INVENTORY_USER || cfg.inventory.user;
config.inventory.password = process.env.INVENTORY_PASSWORD || cfg.inventory.password || "";
}
/* Get service information for Cloudify Manager */
- return getService(configStoreURL, CM_NAME);
+ return getService(CM_NAME);
})
-
+
.then(function(cmService) {
-
config.cloudify.url = config.cloudify.protocol +"://" + cmService.address + ":" + cmService.port + CM_API_PATH;
-
+
/* Get service information for inventory */
- return getService(configStoreURL, INV_NAME);
+ return getService(INV_NAME);
})
-
+
.then(function(invService) {
config.inventory.url = config.inventory.protocol + "://" + invService.address + ":" + invService.port + INV_API_PATH;
-
+
/* Get TLS credentials, if they exist */
return getTLSCredentials();
})
.then(function(tls) {
config.ssl = tls;
-
+
/* Check for missing required configuration parameters */
const missing = findMissingConfig(config);
if (missing.length > 0) {
throw new Error ("Required configuration elements missing: " + missing.join(','));
config = null;
}
-
return config;
});
};
diff --git a/lib/consul.js b/lib/consul.js
index 5b76ffc..3a3257b 100644
--- a/lib/consul.js
+++ b/lib/consul.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -18,49 +18,45 @@ See the License for the specific language governing permissions and limitations
const KEY = '/v1/kv/';
const SERVICE = '/v1/catalog/service/';
-const DEFAULT_URL = 'http://localhost:8500';
+const CONSUL = 'consul';
+const CONSUL_URL = 'http://' + (process.env.CONSUL_HOST || CONSUL) + ':8500';
const doRequest = require('./promise_request').doRequest;
-module.exports = function(options) {
- const url = options.url || DEFAULT_URL;
-
- return {
-
- /* Fetch (a promise for) the decoded value of a single key from Consul KV store.
- * If the value is a string representation of a JSON object, return as an object.
- * If there is no such key, resolve to null.
- */
- getKey: function(key) {
- return doRequest({method: 'GET', uri: url + KEY + key + '?raw'})
- .then(function(res) {
- return res.json || res.body;
- })
- .catch(function(err) {
- if (err.status === 404) {
- /* Key wasn't found */
- return null;
- }
- else {
- /* Some other error, rethrow it */
- throw err;
- }
- });
- },
-
- /* Retrieve (a promise for) address:port information for a named service from the Consul service catalog.
- * If the service has tag(s), return the first one. (Should be the full URL of the service if it exists.
- * Since a service can be registered at multiple nodes, the result is an array.
- * If the service is not found, returns a zero-length array.
- */
- getService: function(serviceId) {
- return doRequest({method: 'GET', uri: url + SERVICE + serviceId})
- .then(function(res){
- return res.json.map(function(r) {
- /* Address for external service is in r.Address with r.ServiceAddress empty */
- return {address: r.ServiceAddress || r.Address, port: r.ServicePort, url: r.ServiceTags ? r.ServiceTags[0] : ""};
- });
- });
- }
- }
+module.exports = {
+ /* Fetch (a promise for) the decoded value of a single key from Consul KV store.
+ * If the value is a string representation of a JSON object, return as an object.
+ * If there is no such key, resolve to null.
+ */
+ getKey: function(key) {
+ return doRequest({method: 'GET', uri: CONSUL_URL + KEY + key + '?raw'}, null, CONSUL)
+ .then(function(res) {
+ return res.json || res.body;
+ })
+ .catch(function(err) {
+ if (err.status === 404) {
+ /* Key wasn't found */
+ return null;
+ }
+ else {
+ /* Some other error, rethrow it */
+ throw err;
+ }
+ });
+ },
+
+ /* Retrieve (a promise for) address:port information for a named service from the Consul service catalog.
+ * If the service has tag(s), return the first one. (Should be the full URL of the service if it exists.
+ * Since a service can be registered at multiple nodes, the result is an array.
+ * If the service is not found, returns a zero-length array.
+ */
+ getService: function(serviceId) {
+ return doRequest({method: 'GET', uri: CONSUL_URL + SERVICE + serviceId}, null, CONSUL)
+ .then(function(res){
+ return res.json.map(function(r) {
+ /* Address for external service is in r.Address with r.ServiceAddress empty */
+ return {address: r.ServiceAddress || r.Address, port: r.ServicePort, url: r.ServiceTags ? r.ServiceTags[0] : ""};
+ });
+ });
+ }
}; \ No newline at end of file
diff --git a/lib/dcae-deployments.js b/lib/dcae-deployments.js
index bcec0e5..38dc3c4 100644
--- a/lib/dcae-deployments.js
+++ b/lib/dcae-deployments.js
@@ -93,7 +93,7 @@ app.get('/', function (req, res, next) {
/* Accept an incoming deployment request */
app.put('/:deploymentId', function(req, res, next) {
- log.debug(req.dcaeRequestId, "body: " + JSON.stringify(req.body));
+ log.debug(req.dcaeReqId, "body: " + JSON.stringify(req.body));
/* Make sure there's a serviceTypeId in the body */
if (!req.body['serviceTypeId']) {
diff --git a/lib/inventory.js b/lib/inventory.js
index 75a0e47..c2e13c9 100644
--- a/lib/inventory.js
+++ b/lib/inventory.js
@@ -1,169 +1,167 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
-
- /* Routines related to accessing DCAE inventory */
-
- "use strict";
-
- const req = require('./promise_request');
- const createError = require('./dispatcher-error').createDispatcherError;
-
- const INV_SERV_TYPES = '/dcae-service-types';
- const INV_SERVICES = '/dcae-services';
-
- /*
- * Common error handling for inventory API calls
- */
- const invError = function(err) {
- if (err.status && err.status === 404) {
- /* Map 404 to an empty list */
- return [];
- }
- else {
- var newErr;
- var message;
- if (err.status) {
- /* Got a response from inventory indicating an error */
- message = "Error response " + err.status + " from DCAE inventory: " + err.body;
- newErr = createError(message, 502, "api", 501, "dcae-inventory");
- }
- else {
- /* Problem connecting to inventory */
- message = "Error communicating with inventory: " + err.message;
- newErr = createError(message, 504, "system", 201, "dcae-inventory");
- }
- throw newErr;
- }
- };
-
- module.exports = function(options) {
-
- const url = options.url;
-
- return {
-
- /* Add a DCAE service to the inventory. Done after a deployment.*/
- addService: function(deploymentId, serviceType, vnfId, vnfType, vnfLocation, outputs) {
-
- /* Create the service description */
- var serviceDescription =
- {
- "vnfId" : vnfId,
- "vnfType" : vnfType,
- "vnfLocation" : vnfLocation,
- "typeId" : serviceType,
- "deploymentRef" : deploymentId
- };
-
- // TODO create 'components' array using 'outputs'--for now, a dummy
- serviceDescription.components = [
- {
- componentType: "dummy_component",
- componentId: "/components/dummy",
- componentSource: "DCAEController",
- shareable: 0
- }
- ];
-
- const reqOptions = {
- method : "PUT",
- uri : url + INV_SERVICES + "/" + deploymentId,
- json: serviceDescription
- };
-
- return req.doRequest(reqOptions);
- },
-
- /* Remove a DCAE service from the inventory. Done after an undeployment. */
- deleteService: function(serviceId) {
- return req.doRequest({method: "DELETE", uri: url + INV_SERVICES + "/" + serviceId});
- },
-
- /* Find running/deploying instances of services (with a given type name, if specified) */
- getServicesByType: function(query) {
- var options = {
- method: 'GET',
- uri: url + INV_SERVICES,
- qs: query || {}
- };
-
- return req.doRequest(options)
- .then (function (result) {
- var services = [];
- var content = JSON.parse(result.body);
- if(content.items) {
- /* Pick out the fields we want */
- services = content.items.map(function(i) { return { deploymentId: i.deploymentRef, serviceTypeId: i.typeId};});
- }
- return services;
- })
- .catch(invError);
- },
-
- /* Find a blueprint given the service type ID -- return blueprint and type ID */
- getBlueprintByType: function(serviceTypeId) {
- return req.doRequest({
- method: "GET",
- uri: url + INV_SERV_TYPES + '/' + serviceTypeId
- })
- .then (function(result) {
- var blueprintInfo = {};
- var content = JSON.parse(result.body);
- blueprintInfo.blueprint = content.blueprintTemplate;
- blueprintInfo.typeId = content.typeId;
-
- return blueprintInfo;
- })
- .catch(invError);
- },
-
- /*
- * Verify that the specified deployment ID does not already have
- * an entry in inventory. This is needed to enforce the rule that
- * creating a second instance of a deployment under the
- * same ID as an existing deployment is not permitted.
- * The function checks for a service in inventory using the
- * deployment ID as service name. If it doesn't exist, the function
- * resolves its promise. If it *does* exist, then it throws an error.
- */
- verifyUniqueDeploymentId: function(deploymentId) {
-
- return req.doRequest({
- method: "GET",
- uri: url + INV_SERVICES + "/" + deploymentId
- })
-
- /* Successful lookup -- the deployment exists, so throw an error */
- .then(function(res) {
- throw createError("Deployment " + deploymentId + " already exists", 409, "api", 501);
- },
-
- /* Error from the lookup -- either deployment ID doesn't exist or some other problem */
- function (err) {
-
- /* Inventory returns a 404 if it does not find the deployment ID */
- if (err.status === 404) {
- return true;
- }
-
- /* Some other error -- it really is an error and we can't continue */
- else {
- return invError(err);
- }
- });
- }
- };
- };
+
+/* Routines related to accessing DCAE inventory */
+
+"use strict";
+const INVENTORY = "inventory";
+
+const doRequest = require('./promise_request').doRequest;
+const createError = require('./dispatcher-error').createDispatcherError;
+
+const INV_SERV_TYPES = '/dcae-service-types';
+const INV_SERVICES = '/dcae-services';
+
+/*
+ * Common error handling for inventory API calls
+ */
+const invError = function(err) {
+ if (err.status && err.status === 404) {
+ /* Map 404 to an empty list */
+ return [];
+ }
+ else {
+ var newErr;
+ var message;
+ if (err.status) {
+ /* Got a response from inventory indicating an error */
+ message = "Error response " + err.status + " from DCAE inventory: " + err.body;
+ newErr = createError(message, 502, "api", 501, "dcae-inventory");
+ }
+ else {
+ /* Problem connecting to inventory */
+ message = "Error communicating with inventory: " + err.message;
+ newErr = createError(message, 504, "system", 201, "dcae-inventory");
+ }
+ throw newErr;
+ }
+};
+
+module.exports = function(options) {
+ const url = options.url;
+
+ return {
+ /* Add a DCAE service to the inventory. Done after a deployment.*/
+ addService: function(deploymentId, serviceType, vnfId, vnfType, vnfLocation, outputs) {
+
+ /* Create the service description */
+ var serviceDescription =
+ {
+ "vnfId" : vnfId,
+ "vnfType" : vnfType,
+ "vnfLocation" : vnfLocation,
+ "typeId" : serviceType,
+ "deploymentRef" : deploymentId
+ };
+
+ // TODO create 'components' array using 'outputs'--for now, a dummy
+ serviceDescription.components = [
+ {
+ componentType: "dummy_component",
+ componentId: "/components/dummy",
+ componentSource: "DCAEController",
+ shareable: 0
+ }
+ ];
+
+ const reqOptions = {
+ method : "PUT",
+ uri : url + INV_SERVICES + "/" + deploymentId,
+ json: serviceDescription
+ };
+
+ return doRequest(reqOptions, null, INVENTORY);
+ },
+
+ /* Remove a DCAE service from the inventory. Done after an undeployment. */
+ deleteService: function(serviceId) {
+ return doRequest({method: "DELETE", uri: url + INV_SERVICES + "/" + serviceId}, null, INVENTORY);
+ },
+
+ /* Find running/deploying instances of services (with a given type name, if specified) */
+ getServicesByType: function(query) {
+ var options = {
+ method: 'GET',
+ uri: url + INV_SERVICES,
+ qs: query || {}
+ };
+
+ return doRequest(options, null, INVENTORY)
+ .then (function (result) {
+ var services = [];
+ var content = JSON.parse(result.body);
+ if(content.items) {
+ /* Pick out the fields we want */
+ services = content.items.map(function(i) { return { deploymentId: i.deploymentRef, serviceTypeId: i.typeId};});
+ }
+ return services;
+ })
+ .catch(invError);
+ },
+
+ /* Find a blueprint given the service type ID -- return blueprint and type ID */
+ getBlueprintByType: function(serviceTypeId) {
+ return doRequest({
+ method: "GET",
+ uri: url + INV_SERV_TYPES + '/' + serviceTypeId
+ }, null, INVENTORY)
+ .then (function(result) {
+ var blueprintInfo = {};
+ var content = JSON.parse(result.body);
+ blueprintInfo.blueprint = content.blueprintTemplate;
+ blueprintInfo.typeId = content.typeId;
+
+ return blueprintInfo;
+ })
+ .catch(invError);
+ },
+
+ /*
+ * Verify that the specified deployment ID does not already have
+ * an entry in inventory. This is needed to enforce the rule that
+ * creating a second instance of a deployment under the
+ * same ID as an existing deployment is not permitted.
+ * The function checks for a service in inventory using the
+ * deployment ID as service name. If it doesn't exist, the function
+ * resolves its promise. If it *does* exist, then it throws an error.
+ */
+ verifyUniqueDeploymentId: function(deploymentId) {
+ return doRequest({
+ method: "GET",
+ uri: url + INV_SERVICES + "/" + deploymentId
+ }, null, INVENTORY)
+
+ /* Successful lookup -- the deployment exists, so throw an error */
+ .then(function(res) {
+ throw createError("Deployment " + deploymentId + " already exists", 409, "api", 501);
+ },
+
+ /* Error from the lookup -- either deployment ID doesn't exist or some other problem */
+ function (err) {
+
+ /* Inventory returns a 404 if it does not find the deployment ID */
+ if (err.status === 404) {
+ return true;
+ }
+
+ /* Some other error -- it really is an error and we can't continue */
+ else {
+ return invError(err);
+ }
+ });
+ }
+ };
+};
diff --git a/lib/logging.js b/lib/logging.js
index 5bfd48a..4d85898 100644
--- a/lib/logging.js
+++ b/lib/logging.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -71,16 +71,17 @@ const ERROR_NFIELDS = 11;
/* Error code -> description mapping */
const descriptions = {
-
+
201: 'Inventory communication error',
202: 'Cloudify Manager communication error',
-
+
501: 'Inventory API error',
502: 'Cloudify Manager API error',
-
+
551: 'HTTP(S) Server initialization error',
552: 'Dispatcher start-up error',
-
+ 553: 'Execute workflow on deployment error',
+
999: 'Unknown error'
};
@@ -122,7 +123,7 @@ const DEBUG_REQID = 1;
const DEBUG_INFO = 2;
const DEBUG_EOR = 3;
const DEBUG_NFIELDS = 4;
-const DEBUG_MARKER = '^\n';
+const DEBUG_MARKER = '^';
/* Format audit record for an incoming API request */
@@ -141,7 +142,7 @@ const formatAuditRecord = function(req, status, extra) {
rec[AUDIT_ELAPSED] = end - req.startTime;
rec[AUDIT_SERVER] = req.hostname // From the Host header, again
rec[AUDIT_CLIENTIP] = req.connection.remoteAddress;
-
+
if (extra) {
rec[AUDIT_DETAILMSG]= extra.replace(/\n/g, " "); /* Collapse multi-line extra data to a single line */
}
@@ -162,7 +163,7 @@ const formatMetricsRecord = function(req, opInfo, extra) {
const end = new Date();
rec[METRICS_END] = end.toISOString();
rec[METRICS_BEGIN] = opInfo.startTime.toISOString();
-
+
/* If reporting on a suboperation invoked as a result of an incoming request, capture info about that request */
if (req) {
rec[METRICS_REQID] = req.dcaeReqId;
@@ -174,14 +175,14 @@ const formatMetricsRecord = function(req, opInfo, extra) {
else {
/* No incoming request */
rec[METRICS_REQID] = 'no incoming request';
- rec[METRICS_SRVNAME] = os.hostname();
+ rec[METRICS_SRVNAME] = os.hostname();
rec[METRICS_SVCNAME] = 'no incoming request';
}
-
+
rec[METRICS_TGTENTITY] = opInfo.targetEntity;
rec[METRICS_TGTSVC] = opInfo.targetService;
rec[METRICS_STATUSCODE] = opInfo.complete ? "COMPLETE" : "ERROR";
- rec[METRICS_RESPCODE] = opInfo.respCode;
+ rec[METRICS_RESPCODE] = opInfo.respCode;
rec[METRICS_CATLOGLEVEL] = "INFO"; // The audit records are informational, regardless of the outcome of the operation
rec[METRICS_ELAPSED] = end - opInfo.startTime;
@@ -196,25 +197,25 @@ const formatMetricsRecord = function(req, opInfo, extra) {
/* Format error log record */
const formatErrorRecord = function(category, code, detail, req, target) {
var rec = new Array(ERROR_NFIELDS);
-
+
/* Common fields */
rec[ERROR_TIMESTAMP] = (new Date()).toISOString();
rec[ERROR_CATEGORY] = category;
rec[ERROR_CODE] = code;
rec[ERROR_DESCRIPTION] = descriptions[code] || 'no description available';
-
+
/* Log error detail in a single line if provided */
if (detail) {
rec[ERROR_MESSAGE] = detail.replace(/\n/g, " ");
}
-
+
/* Fields available if the error happened during processing of an incoming API request */
if (req) {
rec[ERROR_REQID] = req.dcaeReqId;
rec[ERROR_SVCNAME] = req.method + ' ' + req.originalUrl; // Method and URL identify the operation being performed
- rec[ERROR_PARTNER] = req.connection.remoteAddress; // We don't have the partner's name, but we know the remote IP address
+ rec[ERROR_PARTNER] = req.connection.remoteAddress; // We don't have the partner's name, but we know the remote IP address
}
-
+
/* Include information about the target entity/service if available */
if (target) {
rec[ERROR_TGTENTITY] = target.entity || '';
@@ -226,35 +227,35 @@ const formatErrorRecord = function(category, code, detail, req, target) {
/* Format debug log record */
const formatDebugRecord = function(reqId, msg) {
var rec = new Array(DEBUG_NFIELDS);
-
+
rec[DEBUG_TIMESTAMP] = new Date().toISOString();
rec[DEBUG_REQID] = reqId || '';
rec[DEBUG_INFO] = msg;
rec[DEBUG_EOR] = DEBUG_MARKER;
-
+
return rec.join('|');
};
exports.getLogger = function() {
return {
-
+
audit: function(req, status, extra) {
auditLogger.info(formatAuditRecord(req, status, extra));
},
-
+
error: function(error, req) {
errorLogger.error(formatErrorRecord("ERROR", error.logCode, error.message, req, {entity: error.target}));
},
-
+
warn: function(error, req) {
errorLogger.error(formatErrorRecord("WARN", error.logCode, error.message, req, {entity: error.target}));
},
-
+
metrics: function(req, opInfo, extra) {
metricsLogger.info(formatMetricsRecord(req, opInfo, extra));
},
-
+
debug: function(reqId, msg) {
debugLogger.debug(formatDebugRecord(reqId, msg));
}
diff --git a/lib/policy.js b/lib/policy.js
new file mode 100644
index 0000000..620870c
--- /dev/null
+++ b/lib/policy.js
@@ -0,0 +1,181 @@
+/*
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and limitations under the License.
+*/
+
+/**
+ * handling policy updates
+ */
+
+"use strict";
+
+const POLICY_UPDATE_OPERATION = "dcae.interfaces.policy.policy_update";
+
+const config = process.mainModule.exports.config;
+const createError = require('./dispatcher-error').createDispatcherError;
+const logger = require('./logging').getLogger();
+
+var cloudify = require("./cloudify.js");
+
+// Set config for cloudify interface library
+cloudify.setAPIAddress(config.cloudify.url);
+cloudify.setCredentials(config.cloudify.user, config.cloudify.password);
+cloudify.setLogger(logger);
+
+/**
+ * receive the policy-updated message from the policy-handler
+ */
+function policyUpdate(req, res, next) {
+ var latest_policies = JSON.stringify((req.body && req.body.latest_policies) || {});
+ logger.debug(req.dcaeReqId, "policyUpdate " + req.originalUrl + " " + latest_policies);
+ /**
+ * reply to and free up the policy_handler
+ */
+ res.json({});
+
+ latest_policies = JSON.parse(latest_policies);
+ /**
+ * filter out the policies to what is deployed in components and needs updating (new policyVersion)
+ */
+ var policy_deployments = {};
+ var policy_ids = {};
+
+ cloudify.getNodeInstances(req, function(node_instances) {
+ node_instances.forEach(node_instance => {
+ if (!node_instance.runtime_properties || !node_instance.runtime_properties.policies) {
+ return;
+ }
+ var deployment = policy_deployments[node_instance.deployment_id] || {
+ "deployment_id": node_instance.deployment_id, "policies": {}, "component_ids": []
+ };
+
+ logger.debug(req.dcaeReqId, "have policy on node_instance: " + JSON.stringify(node_instance));
+ var have_policies = false;
+ Object.keys(node_instance.runtime_properties.policies).forEach(policy_id => {
+ var deployed_policy = node_instance.runtime_properties.policies[policy_id];
+ var latest_policy = latest_policies[policy_id];
+ if (!latest_policy || !latest_policy.policy_body
+ || isNaN(latest_policy.policy_body.policyVersion)
+ || latest_policy.policy_body.policyVersion
+ === (deployed_policy.policy_body && deployed_policy.policy_body.policyVersion)) {
+ return;
+ }
+ have_policies = true;
+ deployment.policies[policy_id] = latest_policy;
+ policy_ids[policy_id] = true;
+ });
+ if (have_policies) {
+ deployment.component_ids.push(node_instance.id);
+ policy_deployments[deployment.deployment_id] = deployment;
+ }
+ });
+
+ logger.debug(req.dcaeReqId, "collected policy_deployments to update " + JSON.stringify(policy_deployments));
+ })
+ .then(function(result) {
+ logger.debug(req.dcaeReqId, "finished loading policy_deployments" + JSON.stringify(result));
+ if (result.status !== 200) {
+ const error_msg = "failed to retrieve component policies from cloudify " + result.message;
+ logger.error(createError(error_msg, result.status, "api", 502, 'cloudify-manager'), req);
+ logger.audit(req, result.status, error_msg);
+ return;
+ }
+
+ var deployment_ids = Object.keys(policy_deployments);
+ var policy_id_count = Object.keys(policy_ids).length;
+ if (!deployment_ids.length) {
+ const msg = "no updated policies to apply to deployments";
+ logger.debug(req.dcaeReqId, msg);
+ logger.audit(req, result.status, msg);
+ return;
+ }
+ const msg = "going to apply updated policies[" + policy_id_count + "] to deployments " + deployment_ids.length;
+ logger.debug(req.dcaeReqId, msg + ": " + JSON.stringify(deployment_ids));
+ logger.audit(req, result.status, msg);
+ deployment_ids.forEach(deployment_id => {
+ var deployment = policy_deployments[deployment_id];
+ deployment.policies = Object.keys(deployment.policies).map(policy_id => {
+ return deployment.policies[policy_id];
+ });
+
+ logger.debug(req.dcaeReqId, "ready to execute-operation policy-update on deployment " + JSON.stringify(deployment));
+ cloudify.executeOperation(req, deployment.deployment_id, POLICY_UPDATE_OPERATION,
+ {'updated_policies': deployment.policies}, deployment.component_ids);
+ });
+ });
+}
+
+/**
+ * retrieve all component-policies from cloudify
+ */
+function getComponentPoliciesFromCloudify(req, res, next) {
+ logger.debug(req.dcaeReqId, "getComponentPoliciesFromCloudify " + req.originalUrl);
+ var response = {"requestId": req.dcaeReqId};
+ response.started = new Date();
+ response.component_policies = [];
+ response.component_ids = [];
+ response.node_instances = [];
+
+ cloudify.getNodeInstances(req, function(node_instances) {
+ Array.prototype.push.apply(response.node_instances, node_instances);
+ node_instances.forEach(node_instance => {
+ if (!node_instance.runtime_properties || !node_instance.runtime_properties.policies) {
+ return;
+ }
+
+ var policies_count = 0;
+ Object.keys(node_instance.runtime_properties.policies).forEach(policy_id => {
+ ++policies_count;
+ var policy = node_instance.runtime_properties.policies[policy_id];
+ policy.component_id = node_instance.id;
+ policy.deployment_id = node_instance.deployment_id;
+ response.component_policies.push(policy);
+ });
+ if (policies_count) {
+ response.component_ids.push({
+ "component_id" : node_instance.id,
+ "policies_count" : policies_count
+ });
+ }
+ });
+
+ logger.debug(req.dcaeReqId, "collected " + response.component_ids.length
+ + " component_ids: " + JSON.stringify(response.component_ids)
+ + " component_policies: " + JSON.stringify(response.component_policies));
+ })
+ .then(function(result) {
+ response.ended = new Date();
+ response.status = result.status;
+ response.message = result.message;
+ logger.debug(req.dcaeReqId, result.message);
+ if (result.status !== 200) {
+ logger.error(createError(result.message, result.status, "api", 502, 'cloudify-manager'), req);
+ }
+ res.status(result.status).json(response);
+ logger.audit(req, result.status, result.message);
+ });
+}
+
+// ========================================================
+
+const app = require('express')();
+app.set('x-powered-by', false);
+app.set('etag', false);
+app.use(require('./middleware').checkType('application/json'));
+app.use(require('body-parser').json({strict: true}));
+
+app.post('/', policyUpdate);
+app.get('/components', getComponentPoliciesFromCloudify);
+
+module.exports = app;
diff --git a/lib/promise_request.js b/lib/promise_request.js
index ca09eee..bda4d66 100644
--- a/lib/promise_request.js
+++ b/lib/promise_request.js
@@ -1,114 +1,131 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
-
+
/* Promise-based HTTP request client */
-
+
"use strict";
-
+
/*
- * Make an HTTP request using a string for the body
+ * Make an HTTP request using a string for the body
* of the request.
- * Return a promise for result in the form
+ * Return a promise for result in the form
* {status: <http status code>, body: <response body>}
*/
-
+
const http = require('http');
const https = require('https');
const url = require('url');
const querystring = require('querystring');
+const logger = require('./logging').getLogger();
+
+exports.doRequest = function(options, body, targetEntity, mainReq) {
+ var opInfo = {"startTime":new Date(), "targetEntity": targetEntity};
+
+ return new Promise(function(resolve, reject) {
+
+ var reqBody = null;
+ if (options.json) {
+ reqBody = JSON.stringify(options.json);
+ options.headers = options.headers || {};
+ options.headers['Content-Type'] = 'application/json';
+ }
+ else if (body) {
+ reqBody = body;
+ }
+
+ if (options.uri) {
+ var parsed = url.parse(options.uri);
+ options.protocol = parsed.protocol;
+ options.hostname = parsed.hostname;
+ options.port = parsed.port;
+ options.path = parsed.path;
+ if (options.qs) {
+ options.path += ('?' + querystring.stringify(options.qs));
+ }
+ opInfo.targetService = options.method + " " + options.uri;
+ }
+
+ try {
+ var req = (options.protocol === 'https:' ? https.request(options) : http.request(options));
+ }
+ catch (e) {
+ opInfo.respCode = 500;
+ opInfo.complete = false;
+ logger.metrics(mainReq, opInfo, e.message);
+
+ reject(e);
+ }
+
+ // Reject promise if there's an error
+ req.on('error', function(error) {
+ opInfo.respCode = error.status || 500;
+ opInfo.complete = false;
+ logger.metrics(mainReq, opInfo, error.message);
+
+ reject(error);
+ });
+
+ // Capture the response
+ req.on('response', function(resp) {
+
+ // Collect the body of the response
+ var rbody = '';
+ resp.on('data', function(d) {
+ rbody += d;
+ });
+
+ // resolve or reject when finished
+ resp.on('end', function() {
+
+ var result = {
+ status : resp.statusCode,
+ body : rbody
+ };
+
+ // Add a JSON version of the body if appropriate
+ if (rbody.length) {
+ try {
+ var jbody = JSON.parse(rbody);
+ result.json = jbody;
+ }
+ catch (pe) {
+ // Do nothing, no json property added to the result object
+ }
+ }
+
+ opInfo.respCode = resp.statusCode || 500;
+ if (resp.statusCode > 199 && resp.statusCode < 300) {
+ // HTTP status code indicates success - resolve the promise
+ opInfo.complete = true;
+ logger.metrics(mainReq, opInfo, result.body);
+
+ resolve(result);
+ } else {
+ // Reject the promise
+ opInfo.complete = false;
+ logger.metrics(mainReq, opInfo, result.body);
+
+ reject(result);
+ }
+ });
+ });
-exports.doRequest = function(options, body) {
-
- return new Promise(function(resolve, reject) {
-
- var reqBody = null;
- if (options.json) {
- reqBody = JSON.stringify(options.json);
- options.headers = options.headers || {};
- options.headers['Content-Type'] = 'application/json';
- }
- else if (body) {
- reqBody = body;
- }
-
- if (options.uri) {
- var parsed = url.parse(options.uri);
- options.protocol = parsed.protocol;
- options.hostname = parsed.hostname;
- options.port = parsed.port;
- options.path = parsed.path;
- if (options.qs) {
- options.path += ('?' + querystring.stringify(options.qs));
- }
- }
-
- try {
- var req = (options.protocol === 'https:' ? https.request(options) : http.request(options));
- }
- catch (e) {
- reject(e);
- }
-
- // Reject promise if there's an error
- req.on('error', function(error) {
- reject(error);
- });
-
- // Capture the response
- req.on('response', function(resp) {
-
- // Collect the body of the response
- var rbody = '';
- resp.on('data', function(d) {
- rbody += d;
- });
-
- // resolve or reject when finished
- resp.on('end', function() {
-
- var result = {
- status : resp.statusCode,
- body : rbody
- };
-
- // Add a JSON version of the body if appropriate
- if (rbody.length > 0) {
- try {
- var jbody = JSON.parse(rbody);
- result.json = jbody;
- }
- catch (pe) {
- // Do nothing, no json property added to the result object
- }
- }
-
- if (resp.statusCode > 199 && resp.statusCode < 300) {
- // HTTP status code indicates success - resolve the promise
- resolve(result);
- }
- else {
- // Reject the promise
- reject(result);
- }
- });
- });
-
- if (reqBody) {
- req.write(reqBody, 'utf8');
- }
- req.end();
- });
+ if (reqBody) {
+ req.write(reqBody, 'utf8');
+ }
+ req.end();
+ });
};
diff --git a/lib/repeat.js b/lib/repeat.js
index 2ea0e14..f4d9532 100644
--- a/lib/repeat.js
+++ b/lib/repeat.js
@@ -1,16 +1,16 @@
/*
-Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
-Licensed under the Apache License, Version 2.0 (the "License");
+Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
-CONDITIONS OF ANY KIND, either express or implied.
+CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
@@ -25,30 +25,30 @@ See the License for the specific language governing permissions and limitations
*/
exports.repeatWhile = function(action, predicate, maxTries, interval) {
- return new Promise(function(resolve, reject) {
-
- var count = 0;
-
- function makeAttempt() {
- action()
- .then (function(res) {
- if (!predicate(res)) {
- // We're done
- resolve(res);
- }
- else {
- if (++count < maxTries) {
- // set up next attempt
- setTimeout(makeAttempt, interval);
- }
- else {
- // we've run out of retries or it's not retryable, so reject the promise
- reject({message: "maximum repetions reached: " + count });
- }
- }
- });
- }
-
- makeAttempt();
- });
+ return new Promise(function(resolve, reject) {
+
+ var count = 0;
+
+ function makeAttempt() {
+ action()
+ .then (function(res) {
+ if (!predicate(res)) {
+ // We're done
+ resolve(res);
+ }
+ else {
+ if (++count < maxTries) {
+ // set up next attempt
+ setTimeout(makeAttempt, interval);
+ }
+ else {
+ // we've run out of retries or it's not retryable, so reject the promise
+ reject({message: "maximum repetitions reached: " + count });
+ }
+ }
+ });
+ }
+
+ makeAttempt();
+ });
};
diff --git a/lib/swagger-ui.js b/lib/swagger-ui.js
new file mode 100644
index 0000000..8c50255
--- /dev/null
+++ b/lib/swagger-ui.js
@@ -0,0 +1,31 @@
+/*
+Copyright(c) 2017 AT&T Intellectual Property. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and limitations under the License.
+*/
+
+/**
+ * swagger-ui for deployment-handler API
+ */
+
+"use strict";
+
+// ========================================================
+
+const app = require('express')();
+const swaggerUi = require('swagger-ui-express');
+const YAML = require('yamljs');
+const swaggerDocument = YAML.load('./deployment-handler-API.yaml');
+app.use("/", swaggerUi.serve, swaggerUi.setup(swaggerDocument));
+
+module.exports = app;