diff options
5 files changed, 54 insertions, 67 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index 50f0b1499..7f32fa95d 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -182,12 +182,12 @@ open class ComponentRemotePythonExecutor( val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else "" val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}" setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive()) - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) log.error(grpcErrMsg, grpcEx) } catch (e: Exception) { val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}" setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive()) - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) log.error(timeoutErrMsg, e) } // if Env preparation was successful, then proceed with command execution in this Env @@ -236,7 +236,7 @@ open class ComponentRemotePythonExecutor( setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_EXEC_CMD, logs = "".asJsonPrimitive(), - error = timeoutErrMsg.asJsonPrimitive(), + message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled ) log.error(timeoutErrMsg, timeoutEx) @@ -245,13 +245,13 @@ open class ComponentRemotePythonExecutor( setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_EXEC_CMD, logs = "".asJsonPrimitive(), - error = timeoutErrMsg.asJsonPrimitive(), + message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled ) log.error(timeoutErrMsg, grpcEx) } catch (e: Exception) { val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}" - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) log.error(timeoutErrMsg, e) } } @@ -280,42 +280,42 @@ open class ComponentRemotePythonExecutor( private fun setNodeOutputProperties( status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(), step: String, + logs: JsonNode, message: JsonNode, - artifacts: JsonNode, logging: Boolean = true ) { setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status) - setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) + setAttribute(ATTRIBUTE_RESPONSE_DATA, message) + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs) if (logging) { - log.info("Executor status : $step : $status") - log.info("Executor artifacts: $step : $artifacts") - log.info("Executor message : $step : $message") + log.info("Executor status : $step : $status") + log.info("Executor message: $step : $message") + log.info("Executor logs : $step : $logs") } } /** - * Utility function to set the output properties and errors of the executor node, in cas of errors + * Utility function to set the output properties and errors of the executor node, in case of errors */ private fun setNodeOutputErrors( status: String, step: String, logs: JsonNode = "N/A".asJsonPrimitive(), - error: JsonNode, + message: JsonNode, logging: Boolean = true ) { setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive()) setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs) - setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive()) + setAttribute(ATTRIBUTE_RESPONSE_DATA, message) if (logging) { - log.info("Executor status : $step : $status") - log.info("Executor message : $step : $error") - log.info("Executor logs : $step : $logs") + log.info("Executor status : $step : $status") + log.info("Executor message: $step : $message") + log.info("Executor logs : $step : $logs") } - addError(status, step, error.toString()) + addError(status, step, logs.toString()) } } diff --git a/ms/command-executor/src/main/docker/Dockerfile b/ms/command-executor/src/main/docker/Dockerfile index 610e10cc2..1e5d4cbb8 100644 --- a/ms/command-executor/src/main/docker/Dockerfile +++ b/ms/command-executor/src/main/docker/Dockerfile @@ -3,7 +3,7 @@ FROM python:3.6-slim ENV GRPC_PYTHON_VERSION 1.20.0 RUN python -m pip install --upgrade pip RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION} -RUN pip install virtualenv==16.7.9 pympler==0.8 +RUN pip install virtualenv==16.7.9 RUN groupadd -r -g 1000 onap && useradd -r -u 1000 -g onap onap diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py index 7c9ef84c1..0533b41f3 100644 --- a/ms/command-executor/src/main/python/command_executor_handler.py +++ b/ms/command-executor/src/main/python/command_executor_handler.py @@ -143,21 +143,15 @@ class CommandExecutorHandler(): rc = newProcess.poll() except Exception as e: err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e) - return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) + result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg)) + return result # deactivate_venv(blueprint_id) #Since return code is only used to check if it's zero (success), we can just return success flag instead. self.logger.debug("python return_code : {}".format(rc)) - if rc == 0: - return utils.build_ret_data(True, results=result, results_log=results_log) - else: - err_msg = "" - if len(results_log) > 0: - # get exception message - err_msg = "{} - {}".format(self.blueprint_id, results_log[-1:][0]) - else: - err_msg = "{} - Process exited with return code {}".format(self.blueprint_id, rc) - return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg) + is_execution_successful = rc == 0 + result.update(utils.build_ret_data(is_execution_successful, results_log=results_log)) + return result def install_packages(self, request, type, f, results): success = self.install_python_packages('UTILITY', results) @@ -233,11 +227,11 @@ class CommandExecutorHandler(): venv.create(self.venv_home, with_pip=True, system_site_packages=True) virtualenv.writefile(os.path.join(bin_dir, "activate_this.py"), virtualenv.ACTIVATE_THIS) self.logger.info("{} - Creation of Python Virtual Environment finished.".format(self.blueprint_id)) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True) except Exception as err: err_msg = "{} - Failed to provision Python Virtual Environment. Error: {}".format(self.blueprint_id, err) self.logger.info(err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) # return map cds_is_successful and err_msg. Status is True on success. err_msg may existence doesn't necessarily indicate fatal condition. # the 'status' should be set to False to indicate error. @@ -255,11 +249,11 @@ class CommandExecutorHandler(): exec (activate_this_script.read(), {'__file__': path}) exec (fixpathenvvar) self.logger.info("Running with PATH : {}".format(os.environ['PATH'])) - return utils.build_ret_data(True, "") + return utils.build_ret_data(True) except Exception as err: err_msg ="{} - Failed to activate Python Virtual Environment. Error: {}".format(self.blueprint_id, err) self.logger.info( err_msg) - return utils.build_ret_data(False, err_msg) + return utils.build_ret_data(False, error=err_msg) def deactivate_venv(self): self.logger.info("{} - Deactivate Python Virtual Environment".format(self.blueprint_id)) diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py index 207097605..aa666ee24 100644 --- a/ms/command-executor/src/main/python/command_executor_server.py +++ b/ms/command-executor/src/main/python/command_executor_server.py @@ -47,18 +47,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi if os.environ.get('CE_DEBUG','false') == "true": self.logger.info(request) - log_results = [] - payload_result = {} handler = CommandExecutorHandler(request) exec_cmd_response = handler.execute_command(request) if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]: self.logger.info("{} - Execution finished successfully.".format(blueprint_id)) - self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY])) - self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY])) else: - self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY])) + self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY])) ret = utils.build_grpc_response(request.requestId, exec_cmd_response) - self.logger.info("Response returned : {}".format(exec_cmd_response)) + self.logger.info("Payload returned : {}".format(exec_cmd_response)) return ret
\ No newline at end of file diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index b98241629..180cd8c12 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,13 +17,12 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json -from pympler import asizeof CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" ERR_MSG_KEY = "err_msg" RESULTS_KEY = "results" RESULTS_LOG_KEY = "results_log" -TRUNC_MSG_LEN = 3 * 1024 * 1024 +RESPONSE_MAX_SIZE = 4 * 1024 * 1024 # 4Mb def get_blueprint_id(request): blueprint_name = request.identifiers.blueprintName @@ -34,44 +33,42 @@ def get_blueprint_id(request): def build_grpc_response(request_id, response): if response[CDS_IS_SUCCESSFUL_KEY]: status = CommandExecutor_pb2.SUCCESS - payload = json.dumps(response[RESULTS_KEY]) else: status = CommandExecutor_pb2.FAILURE - # truncate error message if too heavy - if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN: - response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN]) - payload = json.dumps(response[ERR_MSG_KEY]) - # truncate cmd-exec logs if too heavy - response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY]) + response.pop(CDS_IS_SUCCESSFUL_KEY) + logs = response.pop(RESULTS_LOG_KEY) + + # Payload should only contains response data returned from the executed script and/or the error message + payload = json.dumps(response) timestamp = Timestamp() timestamp.GetCurrentTime() - return CommandExecutor_pb2.ExecutionOutput(requestId=request_id, - response=response[RESULTS_LOG_KEY], + execution_output = CommandExecutor_pb2.ExecutionOutput(requestId=request_id, + response=logs, status=status, payload=payload, timestamp=timestamp) -# build a ret data structure -def build_ret_data(cds_is_successful, results={}, results_log=[], error=None): + return truncate_execution_output(execution_output) + +# build a ret data structure used to populate the ExecutionOutput +def build_ret_data(cds_is_successful, results_log=[], error=None): ret_data = { - CDS_IS_SUCCESSFUL_KEY: cds_is_successful, - RESULTS_KEY: results, - RESULTS_LOG_KEY: results_log - } + CDS_IS_SUCCESSFUL_KEY: cds_is_successful, + RESULTS_LOG_KEY: results_log + } if error: ret_data[ERR_MSG_KEY] = error return ret_data -def truncate_cmd_exec_logs(logs): - truncated_logs = [] - truncated_logs_memsize = 0 - for log in logs: - truncated_logs_memsize += asizeof.asizeof(log) - if truncated_logs_memsize > TRUNC_MSG_LEN: - truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.") - break - truncated_logs.append(log) - return truncated_logs
\ No newline at end of file +# Truncate execution logs to make sure gRPC response doesn't exceed the gRPC buffer capacity +def truncate_execution_output(execution_output): + sum_truncated_chars = 0 + if execution_output.ByteSize() > RESPONSE_MAX_SIZE: + while execution_output.ByteSize() > RESPONSE_MAX_SIZE: + removed_item = execution_output.response.pop() + sum_truncated_chars += len(removed_item) + execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars)) + return execution_output
\ No newline at end of file |