diff options
4 files changed, 137 insertions, 96 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index e707e5315..ecc3d2e65 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -195,7 +195,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion), command = scriptCommand, properties = properties, - timeOut = implementation.timeout.toLong()) + timeOut = executionTimeout.toLong()) val remoteExecutionOutputDeferred = GlobalScope.async { remoteScriptExecutionService.executeCommand(remoteExecutionInput) diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 1e6f03b81..f2a3d95df 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -14,9 +14,9 @@ # limitations under the License. # from builtins import Exception, open, dict -from subprocess import CalledProcessError, PIPE +from subprocess import CalledProcessError, PIPE, TimeoutExpired from google.protobuf.json_format import MessageToJson - +import tempfile import logging import os import re @@ -25,8 +25,6 @@ import virtualenv import venv import utils import proto.CommandExecutor_pb2 as CommandExecutor_pb2 -import email.parser -import json REQUIREMENTS_TXT = "requirements.txt" @@ -37,49 +35,57 @@ class CommandExecutorHandler(): self.request = request self.logger = logging.getLogger(self.__class__.__name__) self.blueprint_id = utils.get_blueprint_id(request) + self.execution_timeout = utils.get_blueprint_timeout(request) self.venv_home = '/opt/app/onap/blueprints/deploy/' + self.blueprint_id self.installed = self.venv_home + '/.installed' def is_installed(self): return os.path.exists(self.installed) - def prepare_env(self, request, results): + def prepare_env(self, request): + results_log = [] if not self.is_installed(): create_venv_status = self.create_venv() - if not create_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status["err_msg"]) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) activate_venv_status = self.activate_venv() - if not activate_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status["err_msg"]) + if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) try: with open(self.installed, "w+") as f: - if not self.install_packages(request, CommandExecutor_pb2.pip, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id)) + if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) f.write("\r\n") # TODO: is \r needed? - results.append("\n") - if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id)) + results_log.append("\n") + if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) except Exception as ex: err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_id, ex) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) else: try: with open(self.installed, "r") as f: - results.append(f.read()) + results_log.append(f.read()) except Exception as ex: - return utils.build_ret_data(False, "ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)) + err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex) + return utils.build_ret_data(False, error=err_msg) # deactivate_venv(blueprint_id) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True, results_log=results_log) - def execute_command(self, request, results): - payload_result = {} + def execute_command(self, request): + # STDOUT/STDERR output of the process + results_log = [] + # encoded payload returned by the process + result = {} # workaround for when packages are not specified, we may not want to go through the install step # can just call create_venv from here. if not self.is_installed(): @@ -87,16 +93,14 @@ class CommandExecutorHandler(): try: if not self.is_installed(): create_venv_status = self.create_venv - if not create_venv_status["cds_is_successful"]: - err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status["err_msg"]) - results.append(err_msg) - return utils.build_ret_data(False, err_msg) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) + return utils.build_ret_data(False, error=err_msg) activate_response = self.activate_venv() - if not activate_response["cds_is_successful"]: - orig_error = activate_response["err_msg"] + if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]: + orig_error = activate_response[utils.ERR_MSG_KEY] err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_id, orig_error) - results.append(err_msg) #TODO: get rid of results and just rely on the return data struct. - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) cmd = "cd " + self.venv_home @@ -105,8 +109,6 @@ class CommandExecutorHandler(): cmd = cmd + "; " + request.command + " -e 'ansible_python_interpreter=" + self.venv_home + "/bin/python'" else: cmd = cmd + "; " + request.command + " " + re.escape(MessageToJson(request.properties)) - payload_section = [] - is_payload_section = False ### extract the original header request into sys-env variables ### RequestID @@ -115,42 +117,31 @@ class CommandExecutorHandler(): subrequest_id = request.correlationId request_id_map = {'CDS_REQUEST_ID':request_id, 'CDS_CORRELATION_ID':subrequest_id} updated_env = { **os.environ, **request_id_map } + self.logger.info("Running blueprint {} with timeout: {}".format(self.blueprint_id, self.execution_timeout)) - with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - shell=True, bufsize=1, universal_newlines=True, env=updated_env) as newProcess: - while True: - output = newProcess.stdout.readline() - if output == '' and newProcess.poll() is not None: - break - if output.startswith('BEGIN_EXTRA_PAYLOAD'): - is_payload_section = True - output = newProcess.stdout.readline() - if output.startswith('END_EXTRA_PAYLOAD'): - is_payload_section = False - output = '' - payload = '\n'.join(payload_section) - msg = email.parser.Parser().parsestr(payload) - for part in msg.get_payload(): - payload_result = json.loads(part.get_payload()) - if output and not is_payload_section: - self.logger.info(output.strip()) - results.append(output.strip()) - else: - payload_section.append(output.strip()) - rc = newProcess.poll() + with tempfile.TemporaryFile(mode="w+") as tmp: + try: + completed_subprocess = subprocess.run(cmd, stdout=tmp, stderr=subprocess.STDOUT, shell=True, + env=updated_env, timeout=self.execution_timeout) + except TimeoutExpired: + timeout_err_msg = "Running command {} failed due to timeout of {} seconds.".format(self.blueprint_id, self.execution_timeout) + self.logger.error(timeout_err_msg) + utils.parse_cmd_exec_output(tmp, self.logger, result, results_log) + return utils.build_ret_data(False, results_log=results_log, error=timeout_err_msg) + + utils.parse_cmd_exec_output(tmp, self.logger, result, results_log) + rc = completed_subprocess.returncode except Exception as e: err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e) - self.logger.info(err_msg) - results.append(e) - payload_result.update(utils.build_ret_data(False, err_msg)) - return payload_result + result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg)) + return result # deactivate_venv(blueprint_id) #Since return code is only used to check if it's zero (success), we can just return success flag instead. self.logger.debug("python return_code : {}".format(rc)) is_execution_successful = rc == 0 - payload_result.update(utils.build_ret_data(is_execution_successful, "")) - return payload_result + result.update(utils.build_ret_data(is_execution_successful, results_log=results_log)) + return result def install_packages(self, request, type, f, results): success = self.install_python_packages('UTILITY', results) @@ -177,6 +168,7 @@ class CommandExecutorHandler(): if REQUIREMENTS_TXT == package: command = ["pip", "install", "-r", self.venv_home + "/Environments/" + REQUIREMENTS_TXT] elif package == 'UTILITY': + # TODO: fix python version that is hardcoded here, may fail if python image is upgraded command = ["cp", "-r", "./cds_utils", self.venv_home + "/lib/python3.6/site-packages/"] else: command = ["pip", "install", package] @@ -226,11 +218,11 @@ class CommandExecutorHandler(): venv.create(self.venv_home, with_pip=True, system_site_packages=True) virtualenv.writefile(os.path.join(bin_dir, "activate_this.py"), virtualenv.ACTIVATE_THIS) self.logger.info("{} - Creation of Python Virtual Environment finished.".format(self.blueprint_id)) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True) except Exception as err: err_msg = "{} - Failed to provision Python Virtual Environment. Error: {}".format(self.blueprint_id, err) self.logger.info(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) # return map cds_is_successful and err_msg. Status is True on success. err_msg may existence doesn't necessarily indicate fatal condition. # the 'status' should be set to False to indicate error. @@ -248,11 +240,11 @@ class CommandExecutorHandler(): exec (activate_this_script.read(), {'__file__': path}) exec (fixpathenvvar) self.logger.info("Running with PATH : {}".format(os.environ['PATH'])) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True) except Exception as err: err_msg ="{} - Failed to activate Python Virtual Environment. Error: {}".format(self.blueprint_id, err) self.logger.info( err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) def deactivate_venv(self): self.logger.info("{} - Deactivate Python Virtual Environment".format(self.blueprint_id)) diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py index 3435e2272..aa666ee24 100644 --- a/ms/command-executor/src/main/python/command_executor_server.py +++ b/ms/command-executor/src/main/python/command_executor_server.py @@ -22,9 +22,6 @@ import proto.CommandExecutor_pb2_grpc as CommandExecutor_pb2_grpc from command_executor_handler import CommandExecutorHandler import utils -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServicer): def __init__(self): @@ -35,14 +32,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi self.logger.info("{} - Received prepareEnv request".format(blueprint_id)) self.logger.info(request) - results = [] handler = CommandExecutorHandler(request) - prepare_env_response = handler.prepare_env(request, results) - if not prepare_env_response["cds_is_successful"]: - self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, False) - self.logger.info("{} - Package installation logs {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, True) + prepare_env_response = handler.prepare_env(request) + if prepare_env_response[utils.CDS_IS_SUCCESSFUL_KEY]: + self.logger.info("{} - Package installation logs {}".format(blueprint_id, prepare_env_response[utils.RESULTS_LOG_KEY])) + else: + self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, prepare_env_response[utils.ERR_MSG_KEY])) + self.logger.info("Prepare Env Response returned : %s" % prepare_env_response) + return utils.build_grpc_response(request.requestId, prepare_env_response) def executeCommand(self, request, context): blueprint_id = utils.get_blueprint_id(request) @@ -50,16 +47,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi if os.environ.get('CE_DEBUG','false') == "true": self.logger.info(request) - log_results = [] - payload_result = {} handler = CommandExecutorHandler(request) - payload_result = handler.execute_command(request, log_results) - if not payload_result["cds_is_successful"]: - self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, log_results)) - else: + exec_cmd_response = handler.execute_command(request) + if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]: self.logger.info("{} - Execution finished successfully.".format(blueprint_id)) + else: + self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY])) - ret = utils.build_grpc_response(request, log_results, payload_result, payload_result["cds_is_successful"]) - self.logger.info("Payload returned %s" % payload_result) + ret = utils.build_grpc_response(request.requestId, exec_cmd_response) + self.logger.info("Payload returned : {}".format(exec_cmd_response)) return ret
\ No newline at end of file diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index 574be51db..54c5ceafa 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,35 +17,89 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json +import email.parser +CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" +ERR_MSG_KEY = "err_msg" +RESULTS_KEY = "results" +RESULTS_LOG_KEY = "results_log" +RESPONSE_MAX_SIZE = 4 * 1024 * 1024 # 4Mb def get_blueprint_id(request): blueprint_name = request.identifiers.blueprintName blueprint_version = request.identifiers.blueprintVersion return blueprint_name + '/' + blueprint_version +def get_blueprint_timeout(request): + return request.timeOut + # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element -def build_grpc_response(request, log_results, payload_return, is_success=False): - if is_success: +def build_grpc_response(request_id, response): + if response[CDS_IS_SUCCESSFUL_KEY]: status = CommandExecutor_pb2.SUCCESS else: status = CommandExecutor_pb2.FAILURE + response.pop(CDS_IS_SUCCESSFUL_KEY) + logs = response.pop(RESULTS_LOG_KEY) + + # Payload should only contains response data returned from the executed script and/or the error message + payload = json.dumps(response) + timestamp = Timestamp() timestamp.GetCurrentTime() - if "cds_is_successful" in payload_return: - payload_return.pop('cds_is_successful') - payload_str = json.dumps(payload_return) - return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId, - response=log_results, + execution_output = CommandExecutor_pb2.ExecutionOutput(requestId=request_id, + response=logs, status=status, - payload=payload_str, + payload=payload, timestamp=timestamp) -# build a return data structure which may contain an error message -def build_ret_data(cds_is_successful, err_msg): - ret_data = {"cds_is_successful": cds_is_successful } - if err_msg != "": - ret_data["err_msg"] = err_msg + return truncate_execution_output(execution_output) + +# build a ret data structure used to populate the ExecutionOutput +def build_ret_data(cds_is_successful, results_log=[], error=None): + ret_data = { + CDS_IS_SUCCESSFUL_KEY: cds_is_successful, + RESULTS_LOG_KEY: results_log + } + if error: + ret_data[ERR_MSG_KEY] = error return ret_data + +# Truncate execution logs to make sure gRPC response doesn't exceed the gRPC buffer capacity +def truncate_execution_output(execution_output): + sum_truncated_chars = 0 + if execution_output.ByteSize() > RESPONSE_MAX_SIZE: + while execution_output.ByteSize() > RESPONSE_MAX_SIZE: + removed_item = execution_output.response.pop() + sum_truncated_chars += len(removed_item) + execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars)) + return execution_output + + +# Read temp file 'outputfile' into results_log and split out the returned payload into payload_result +def parse_cmd_exec_output(outputfile, logger, payload_result, results_log): + payload_section = [] + is_payload_section = False + outputfile.seek(0) + while True: + output = outputfile.readline() + if output == '': + break + if output.startswith('BEGIN_EXTRA_PAYLOAD'): + is_payload_section = True + output = outputfile.readline() + if output.startswith('END_EXTRA_PAYLOAD'): + is_payload_section = False + output = '' + payload = '\n'.join(payload_section) + msg = email.parser.Parser().parsestr(payload) + for part in msg.get_payload(): + payload_result.update(json.loads(part.get_payload())) + if output and not is_payload_section: + logger.info(output.strip()) + results_log.append(output.strip()) + else: + payload_section.append(output.strip()) + |