From 4ad951ee4c3ed41ca58a240e8b9193416c304b20 Mon Sep 17 00:00:00 2001 From: Oleg Mitsura Date: Wed, 8 Jul 2020 03:07:07 -0400 Subject: cmd-exec server-side timeout. Issue-ID: CCSDK-2535 Signed-off-by: Oleg Mitsura Change-Id: I897678a5a8a23503a878f2d3eb836ba4597a6e6e --- .../executor/ComponentRemotePythonExecutor.kt | 2 +- .../src/main/python/command_executor_handler.py | 49 ++++++++++------------ ms/command-executor/src/main/python/utils.py | 33 ++++++++++++++- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index 7f32fa95d..e48016745 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -201,7 +201,7 @@ open class ComponentRemotePythonExecutor( remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion), command = scriptCommand, properties = properties, - timeOut = implementation.timeout.toLong()) + timeOut = executionTimeout.toLong()) val remoteExecutionOutputDeferred = GlobalScope.async { remoteScriptExecutionService.executeCommand(remoteExecutionInput) diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 0533b41f3..eec385783 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -14,9 +14,9 @@ # limitations under the License. # from builtins import Exception, open, dict -from subprocess import CalledProcessError, PIPE +from subprocess import CalledProcessError, PIPE, TimeoutExpired from google.protobuf.json_format import MessageToJson - +import tempfile import logging import os import re @@ -25,8 +25,6 @@ import virtualenv import venv import utils import proto.CommandExecutor_pb2 as CommandExecutor_pb2 -import email.parser -import json REQUIREMENTS_TXT = "requirements.txt" @@ -37,6 +35,7 @@ class CommandExecutorHandler(): self.request = request self.logger = logging.getLogger(self.__class__.__name__) self.blueprint_id = utils.get_blueprint_id(request) + self.execution_timeout = utils.get_blueprint_timeout(request) self.venv_home = '/opt/app/onap/blueprints/deploy/' + self.blueprint_id self.installed = self.venv_home + '/.installed' @@ -83,7 +82,9 @@ class CommandExecutorHandler(): return utils.build_ret_data(True, results_log=results_log) def execute_command(self, request): + # STDOUT/STDERR output of the process results_log = [] + # encoded payload returned by the process result = {} # workaround for when packages are not specified, we may not want to go through the install step # can just call create_venv from here. @@ -108,8 +109,6 @@ class CommandExecutorHandler(): cmd = cmd + "; " + request.command + " -e 'ansible_python_interpreter=" + self.venv_home + "/bin/python'" else: cmd = cmd + "; " + request.command + " " + re.escape(MessageToJson(request.properties)) - payload_section = [] - is_payload_section = False ### extract the original header request into sys-env variables ### RequestID @@ -118,29 +117,20 @@ class CommandExecutorHandler(): subrequest_id = request.correlationId request_id_map = {'CDS_REQUEST_ID':request_id, 'CDS_CORRELATION_ID':subrequest_id} updated_env = { **os.environ, **request_id_map } + self.logger.info("Running blueprint {} with timeout: {}".format(self.blueprint_id, self.execution_timeout)) - with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - shell=True, bufsize=1, universal_newlines=True, env=updated_env) as newProcess: - while True: - output = newProcess.stdout.readline() - if output == '' and newProcess.poll() is not None: - break - if output.startswith('BEGIN_EXTRA_PAYLOAD'): - is_payload_section = True - output = newProcess.stdout.readline() - if output.startswith('END_EXTRA_PAYLOAD'): - is_payload_section = False - output = '' - payload = '\n'.join(payload_section) - msg = email.parser.Parser().parsestr(payload) - for part in msg.get_payload(): - result = json.loads(part.get_payload()) - if output and not is_payload_section: - self.logger.info(output.strip()) - results_log.append(output.strip()) - else: - payload_section.append(output.strip()) - rc = newProcess.poll() + with tempfile.TemporaryFile(mode="w+") as tmp: + try: + completed_subprocess = subprocess.run(cmd, stdout=tmp, stderr=subprocess.STDOUT, shell=True, + env=updated_env, timeout=self.execution_timeout) + except TimeoutExpired: + timeout_err_msg = "Running command {} failed due to timeout of {} seconds.".format(self.blueprint_id, self.execution_timeout) + self.logger.error(timeout_err_msg) + utils.parse_cmd_exec_output(tmp, self.logger, result, results_log) + return utils.build_ret_data(False, results_log=results_log, error=timeout_err_msg) + + utils.parse_cmd_exec_output(tmp, self.logger, result, results_log) + rc = completed_subprocess.returncode except Exception as e: err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e) result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg)) @@ -178,6 +168,7 @@ class CommandExecutorHandler(): if REQUIREMENTS_TXT == package: command = ["pip", "install", "-r", self.venv_home + "/Environments/" + REQUIREMENTS_TXT] elif package == 'UTILITY': + # TODO: fix python version that is hardcoded here, may fail if python image is upgraded command = ["cp", "-r", "./cds_utils", self.venv_home + "/lib/python3.6/site-packages/"] else: command = ["pip", "install", package] @@ -263,3 +254,5 @@ class CommandExecutorHandler(): except Exception as err: self.logger.info( "{} - Failed to deactivate Python Virtual Environment. Error: {}".format(self.blueprint_id, err)) + + diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index 180cd8c12..54c5ceafa 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,6 +17,7 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json +import email.parser CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" ERR_MSG_KEY = "err_msg" @@ -29,6 +30,9 @@ def get_blueprint_id(request): blueprint_version = request.identifiers.blueprintVersion return blueprint_name + '/' + blueprint_version +def get_blueprint_timeout(request): + return request.timeOut + # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element def build_grpc_response(request_id, response): if response[CDS_IS_SUCCESSFUL_KEY]: @@ -71,4 +75,31 @@ def truncate_execution_output(execution_output): removed_item = execution_output.response.pop() sum_truncated_chars += len(removed_item) execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars)) - return execution_output \ No newline at end of file + return execution_output + + +# Read temp file 'outputfile' into results_log and split out the returned payload into payload_result +def parse_cmd_exec_output(outputfile, logger, payload_result, results_log): + payload_section = [] + is_payload_section = False + outputfile.seek(0) + while True: + output = outputfile.readline() + if output == '': + break + if output.startswith('BEGIN_EXTRA_PAYLOAD'): + is_payload_section = True + output = outputfile.readline() + if output.startswith('END_EXTRA_PAYLOAD'): + is_payload_section = False + output = '' + payload = '\n'.join(payload_section) + msg = email.parser.Parser().parsestr(payload) + for part in msg.get_payload(): + payload_result.update(json.loads(part.get_payload())) + if output and not is_payload_section: + logger.info(output.strip()) + results_log.append(output.strip()) + else: + payload_section.append(output.strip()) + -- cgit 1.2.3-korg