From 888f2f78a580a3deccb66bef50f2375a04b39eb6 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 20 Apr 2020 11:53:41 -0400 Subject: Truncate message published on Kafka / Spike: Define solution for logs separation Refactoring of cmd-exec component - Improve display of error messages within the response - Fix gRPC buffer limitation (4Mb) by truncating error messages and cmd-exec logs if too heavy (>3Mb) Truncation of BP responses (<4Kb) before sending them in kafka audit topics. - Truncation if needed of error messages for every response - Truncation of cmd-exec logs in cmd-exec responses (Spike) Add a flag in the application.properties to enable/disable the display of cmd-exec responses on the BP side (Fix) Correction of BP processing with kafka regression (Fix) Changed default SSL Endpoint Algo Issue-ID: CCSDK-2326 Change-Id: If4d0e661117d1dd156cf19c95774824e754d870a Signed-off-by: Julien Fontaine --- ms/command-executor/src/main/docker/Dockerfile | 2 +- .../src/main/python/command_executor_handler.py | 72 +++++++++++----------- .../src/main/python/command_executor_server.py | 31 +++++----- ms/command-executor/src/main/python/utils.py | 52 ++++++++++++---- 4 files changed, 92 insertions(+), 65 deletions(-) (limited to 'ms/command-executor') diff --git a/ms/command-executor/src/main/docker/Dockerfile b/ms/command-executor/src/main/docker/Dockerfile index c38126066..7a20469b7 100644 --- a/ms/command-executor/src/main/docker/Dockerfile +++ b/ms/command-executor/src/main/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.6-slim ENV GRPC_PYTHON_VERSION 1.20.0 RUN python -m pip install --upgrade pip RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION} -RUN pip install virtualenv==16.7.9 +RUN pip install virtualenv==16.7.9 pympler==0.8 RUN groupadd -r onap && useradd -r -g onap onap diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 1e6f03b81..0c476b23e 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -43,43 +43,48 @@ class CommandExecutorHandler(): def is_installed(self): return os.path.exists(self.installed) - def prepare_env(self, request, results): + def prepare_env(self, request): + results_log = [] if not self.is_installed(): create_venv_status = self.create_venv() - if not create_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status["err_msg"]) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) activate_venv_status = self.activate_venv() - if not activate_venv_status["cds_is_successful"]: - err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status["err_msg"]) + if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status[utils.ERR_MSG_KEY]) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) try: with open(self.installed, "w+") as f: - if not self.install_packages(request, CommandExecutor_pb2.pip, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id)) + if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) f.write("\r\n") # TODO: is \r needed? - results.append("\n") - if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results): - return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id)) + results_log.append("\n") + if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log): + err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id) + return utils.build_ret_data(False, results_log=results_log, error=err_msg) except Exception as ex: err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_id, ex) self.logger.error(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) else: try: with open(self.installed, "r") as f: - results.append(f.read()) + results_log.append(f.read()) except Exception as ex: - return utils.build_ret_data(False, "ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)) + err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex) + return utils.build_ret_data(False, error=err_msg) # deactivate_venv(blueprint_id) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True, results_log=results_log) - def execute_command(self, request, results): - payload_result = {} + def execute_command(self, request): + results_log = [] + result = {} # workaround for when packages are not specified, we may not want to go through the install step # can just call create_venv from here. if not self.is_installed(): @@ -87,16 +92,14 @@ class CommandExecutorHandler(): try: if not self.is_installed(): create_venv_status = self.create_venv - if not create_venv_status["cds_is_successful"]: - err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status["err_msg"]) - results.append(err_msg) - return utils.build_ret_data(False, err_msg) + if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]: + err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY]) + return utils.build_ret_data(False, error=err_msg) activate_response = self.activate_venv() - if not activate_response["cds_is_successful"]: - orig_error = activate_response["err_msg"] + if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]: + orig_error = activate_response[utils.ERR_MSG_KEY] err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_id, orig_error) - results.append(err_msg) #TODO: get rid of results and just rely on the return data struct. - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) cmd = "cd " + self.venv_home @@ -131,26 +134,25 @@ class CommandExecutorHandler(): payload = '\n'.join(payload_section) msg = email.parser.Parser().parsestr(payload) for part in msg.get_payload(): - payload_result = json.loads(part.get_payload()) + result = json.loads(part.get_payload()) if output and not is_payload_section: self.logger.info(output.strip()) - results.append(output.strip()) + results_log.append(output.strip()) else: payload_section.append(output.strip()) rc = newProcess.poll() except Exception as e: err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e) - self.logger.info(err_msg) - results.append(e) - payload_result.update(utils.build_ret_data(False, err_msg)) - return payload_result + return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) # deactivate_venv(blueprint_id) #Since return code is only used to check if it's zero (success), we can just return success flag instead. self.logger.debug("python return_code : {}".format(rc)) - is_execution_successful = rc == 0 - payload_result.update(utils.build_ret_data(is_execution_successful, "")) - return payload_result + if rc == 0: + return utils.build_ret_data(True, results=result, results_log=results_log) + else: + err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id) + return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) def install_packages(self, request, type, f, results): success = self.install_python_packages('UTILITY', results) diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py index 3435e2272..207097605 100644 --- a/ms/command-executor/src/main/python/command_executor_server.py +++ b/ms/command-executor/src/main/python/command_executor_server.py @@ -22,9 +22,6 @@ import proto.CommandExecutor_pb2_grpc as CommandExecutor_pb2_grpc from command_executor_handler import CommandExecutorHandler import utils -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 - - class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServicer): def __init__(self): @@ -35,14 +32,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi self.logger.info("{} - Received prepareEnv request".format(blueprint_id)) self.logger.info(request) - results = [] handler = CommandExecutorHandler(request) - prepare_env_response = handler.prepare_env(request, results) - if not prepare_env_response["cds_is_successful"]: - self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, False) - self.logger.info("{} - Package installation logs {}".format(blueprint_id, results)) - return utils.build_grpc_response(request, results, {}, True) + prepare_env_response = handler.prepare_env(request) + if prepare_env_response[utils.CDS_IS_SUCCESSFUL_KEY]: + self.logger.info("{} - Package installation logs {}".format(blueprint_id, prepare_env_response[utils.RESULTS_LOG_KEY])) + else: + self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, prepare_env_response[utils.ERR_MSG_KEY])) + self.logger.info("Prepare Env Response returned : %s" % prepare_env_response) + return utils.build_grpc_response(request.requestId, prepare_env_response) def executeCommand(self, request, context): blueprint_id = utils.get_blueprint_id(request) @@ -53,13 +50,15 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi log_results = [] payload_result = {} handler = CommandExecutorHandler(request) - payload_result = handler.execute_command(request, log_results) - if not payload_result["cds_is_successful"]: - self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, log_results)) - else: + exec_cmd_response = handler.execute_command(request) + if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]: self.logger.info("{} - Execution finished successfully.".format(blueprint_id)) + self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY])) + self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY])) + else: + self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY])) - ret = utils.build_grpc_response(request, log_results, payload_result, payload_result["cds_is_successful"]) - self.logger.info("Payload returned %s" % payload_result) + ret = utils.build_grpc_response(request.requestId, exec_cmd_response) + self.logger.info("Response returned : {}".format(exec_cmd_response)) return ret \ No newline at end of file diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index 574be51db..b98241629 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,7 +17,13 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json +from pympler import asizeof +CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" +ERR_MSG_KEY = "err_msg" +RESULTS_KEY = "results" +RESULTS_LOG_KEY = "results_log" +TRUNC_MSG_LEN = 3 * 1024 * 1024 def get_blueprint_id(request): blueprint_name = request.identifiers.blueprintName @@ -25,27 +31,47 @@ def get_blueprint_id(request): return blueprint_name + '/' + blueprint_version # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element -def build_grpc_response(request, log_results, payload_return, is_success=False): - if is_success: +def build_grpc_response(request_id, response): + if response[CDS_IS_SUCCESSFUL_KEY]: status = CommandExecutor_pb2.SUCCESS + payload = json.dumps(response[RESULTS_KEY]) else: status = CommandExecutor_pb2.FAILURE + # truncate error message if too heavy + if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN: + response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN]) + payload = json.dumps(response[ERR_MSG_KEY]) + + # truncate cmd-exec logs if too heavy + response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY]) timestamp = Timestamp() timestamp.GetCurrentTime() - if "cds_is_successful" in payload_return: - payload_return.pop('cds_is_successful') - payload_str = json.dumps(payload_return) - return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId, - response=log_results, + return CommandExecutor_pb2.ExecutionOutput(requestId=request_id, + response=response[RESULTS_LOG_KEY], status=status, - payload=payload_str, + payload=payload, timestamp=timestamp) -# build a return data structure which may contain an error message -def build_ret_data(cds_is_successful, err_msg): - ret_data = {"cds_is_successful": cds_is_successful } - if err_msg != "": - ret_data["err_msg"] = err_msg +# build a ret data structure +def build_ret_data(cds_is_successful, results={}, results_log=[], error=None): + ret_data = { + CDS_IS_SUCCESSFUL_KEY: cds_is_successful, + RESULTS_KEY: results, + RESULTS_LOG_KEY: results_log + } + if error: + ret_data[ERR_MSG_KEY] = error return ret_data + +def truncate_cmd_exec_logs(logs): + truncated_logs = [] + truncated_logs_memsize = 0 + for log in logs: + truncated_logs_memsize += asizeof.asizeof(log) + if truncated_logs_memsize > TRUNC_MSG_LEN: + truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.") + break + truncated_logs.append(log) + return truncated_logs \ No newline at end of file -- cgit 1.2.3-korg