aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-06-29 19:54:27 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-07-02 15:36:20 +0000
commit01e3ff777b995767311be29ad51ebba53cb054c2 (patch)
tree6906e5c275096ea6834a02fd04fc5c3b9102e563
parentca17370464f688d3bd5b63de12d9163ef8e31e08 (diff)
Command Executor : Invalid response_data when executed script fails
* Modified command exec returned value in case of failure during execution. It now prints the response_data defined by the user * Modified truncation method of the gRPC returned object to use ByteSize() to get the exact sizxe consumed within the buffer Issue-ID: CCSDK-2501 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: Ie1db8db265623b5137ab3946ff4e3abda1c54a78
-rw-r--r--ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt36
-rw-r--r--ms/command-executor/src/main/docker/Dockerfile2
-rw-r--r--ms/command-executor/src/main/python/command_executor_handler.py24
-rw-r--r--ms/command-executor/src/main/python/command_executor_server.py8
-rw-r--r--ms/command-executor/src/main/python/utils.py51
5 files changed, 54 insertions, 67 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
index 50f0b1499..7f32fa95d 100644
--- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
+++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
@@ -182,12 +182,12 @@ open class ComponentRemotePythonExecutor(
val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}"
setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
- setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
log.error(grpcErrMsg, grpcEx)
} catch (e: Exception) {
val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
- setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
log.error(timeoutErrMsg, e)
}
// if Env preparation was successful, then proceed with command execution in this Env
@@ -236,7 +236,7 @@ open class ComponentRemotePythonExecutor(
setNodeOutputErrors(status = StatusType.FAILURE.name,
step = STEP_EXEC_CMD,
logs = "".asJsonPrimitive(),
- error = timeoutErrMsg.asJsonPrimitive(),
+ message = timeoutErrMsg.asJsonPrimitive(),
logging = isLogResponseEnabled
)
log.error(timeoutErrMsg, timeoutEx)
@@ -245,13 +245,13 @@ open class ComponentRemotePythonExecutor(
setNodeOutputErrors(status = StatusType.FAILURE.name,
step = STEP_EXEC_CMD,
logs = "".asJsonPrimitive(),
- error = timeoutErrMsg.asJsonPrimitive(),
+ message = timeoutErrMsg.asJsonPrimitive(),
logging = isLogResponseEnabled
)
log.error(timeoutErrMsg, grpcEx)
} catch (e: Exception) {
val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
- setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+ setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
log.error(timeoutErrMsg, e)
}
}
@@ -280,42 +280,42 @@ open class ComponentRemotePythonExecutor(
private fun setNodeOutputProperties(
status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(),
step: String,
+ logs: JsonNode,
message: JsonNode,
- artifacts: JsonNode,
logging: Boolean = true
) {
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
- setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
- setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
+ setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
+ setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
if (logging) {
- log.info("Executor status : $step : $status")
- log.info("Executor artifacts: $step : $artifacts")
- log.info("Executor message : $step : $message")
+ log.info("Executor status : $step : $status")
+ log.info("Executor message: $step : $message")
+ log.info("Executor logs : $step : $logs")
}
}
/**
- * Utility function to set the output properties and errors of the executor node, in cas of errors
+ * Utility function to set the output properties and errors of the executor node, in case of errors
*/
private fun setNodeOutputErrors(
status: String,
step: String,
logs: JsonNode = "N/A".asJsonPrimitive(),
- error: JsonNode,
+ message: JsonNode,
logging: Boolean = true
) {
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
- setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive())
+ setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
if (logging) {
- log.info("Executor status : $step : $status")
- log.info("Executor message : $step : $error")
- log.info("Executor logs : $step : $logs")
+ log.info("Executor status : $step : $status")
+ log.info("Executor message: $step : $message")
+ log.info("Executor logs : $step : $logs")
}
- addError(status, step, error.toString())
+ addError(status, step, logs.toString())
}
}
diff --git a/ms/command-executor/src/main/docker/Dockerfile b/ms/command-executor/src/main/docker/Dockerfile
index 610e10cc2..1e5d4cbb8 100644
--- a/ms/command-executor/src/main/docker/Dockerfile
+++ b/ms/command-executor/src/main/docker/Dockerfile
@@ -3,7 +3,7 @@ FROM python:3.6-slim
ENV GRPC_PYTHON_VERSION 1.20.0
RUN python -m pip install --upgrade pip
RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION}
-RUN pip install virtualenv==16.7.9 pympler==0.8
+RUN pip install virtualenv==16.7.9
RUN groupadd -r -g 1000 onap && useradd -r -u 1000 -g onap onap
diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py
index 7c9ef84c1..0533b41f3 100644
--- a/ms/command-executor/src/main/python/command_executor_handler.py
+++ b/ms/command-executor/src/main/python/command_executor_handler.py
@@ -143,21 +143,15 @@ class CommandExecutorHandler():
rc = newProcess.poll()
except Exception as e:
err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e)
- return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
+ result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg))
+ return result
# deactivate_venv(blueprint_id)
#Since return code is only used to check if it's zero (success), we can just return success flag instead.
self.logger.debug("python return_code : {}".format(rc))
- if rc == 0:
- return utils.build_ret_data(True, results=result, results_log=results_log)
- else:
- err_msg = ""
- if len(results_log) > 0:
- # get exception message
- err_msg = "{} - {}".format(self.blueprint_id, results_log[-1:][0])
- else:
- err_msg = "{} - Process exited with return code {}".format(self.blueprint_id, rc)
- return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
+ is_execution_successful = rc == 0
+ result.update(utils.build_ret_data(is_execution_successful, results_log=results_log))
+ return result
def install_packages(self, request, type, f, results):
success = self.install_python_packages('UTILITY', results)
@@ -233,11 +227,11 @@ class CommandExecutorHandler():
venv.create(self.venv_home, with_pip=True, system_site_packages=True)
virtualenv.writefile(os.path.join(bin_dir, "activate_this.py"), virtualenv.ACTIVATE_THIS)
self.logger.info("{} - Creation of Python Virtual Environment finished.".format(self.blueprint_id))
- return utils.build_ret_data(True, "")
+ return utils.build_ret_data(True)
except Exception as err:
err_msg = "{} - Failed to provision Python Virtual Environment. Error: {}".format(self.blueprint_id, err)
self.logger.info(err_msg)
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
# return map cds_is_successful and err_msg. Status is True on success. err_msg may existence doesn't necessarily indicate fatal condition.
# the 'status' should be set to False to indicate error.
@@ -255,11 +249,11 @@ class CommandExecutorHandler():
exec (activate_this_script.read(), {'__file__': path})
exec (fixpathenvvar)
self.logger.info("Running with PATH : {}".format(os.environ['PATH']))
- return utils.build_ret_data(True, "")
+ return utils.build_ret_data(True)
except Exception as err:
err_msg ="{} - Failed to activate Python Virtual Environment. Error: {}".format(self.blueprint_id, err)
self.logger.info( err_msg)
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
def deactivate_venv(self):
self.logger.info("{} - Deactivate Python Virtual Environment".format(self.blueprint_id))
diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py
index 207097605..aa666ee24 100644
--- a/ms/command-executor/src/main/python/command_executor_server.py
+++ b/ms/command-executor/src/main/python/command_executor_server.py
@@ -47,18 +47,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
if os.environ.get('CE_DEBUG','false') == "true":
self.logger.info(request)
- log_results = []
- payload_result = {}
handler = CommandExecutorHandler(request)
exec_cmd_response = handler.execute_command(request)
if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]:
self.logger.info("{} - Execution finished successfully.".format(blueprint_id))
- self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
- self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY]))
else:
- self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY]))
+ self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
ret = utils.build_grpc_response(request.requestId, exec_cmd_response)
- self.logger.info("Response returned : {}".format(exec_cmd_response))
+ self.logger.info("Payload returned : {}".format(exec_cmd_response))
return ret \ No newline at end of file
diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py
index b98241629..180cd8c12 100644
--- a/ms/command-executor/src/main/python/utils.py
+++ b/ms/command-executor/src/main/python/utils.py
@@ -17,13 +17,12 @@ from google.protobuf.timestamp_pb2 import Timestamp
import proto.CommandExecutor_pb2 as CommandExecutor_pb2
import json
-from pympler import asizeof
CDS_IS_SUCCESSFUL_KEY = "cds_is_successful"
ERR_MSG_KEY = "err_msg"
RESULTS_KEY = "results"
RESULTS_LOG_KEY = "results_log"
-TRUNC_MSG_LEN = 3 * 1024 * 1024
+RESPONSE_MAX_SIZE = 4 * 1024 * 1024 # 4Mb
def get_blueprint_id(request):
blueprint_name = request.identifiers.blueprintName
@@ -34,44 +33,42 @@ def get_blueprint_id(request):
def build_grpc_response(request_id, response):
if response[CDS_IS_SUCCESSFUL_KEY]:
status = CommandExecutor_pb2.SUCCESS
- payload = json.dumps(response[RESULTS_KEY])
else:
status = CommandExecutor_pb2.FAILURE
- # truncate error message if too heavy
- if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN:
- response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN])
- payload = json.dumps(response[ERR_MSG_KEY])
- # truncate cmd-exec logs if too heavy
- response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY])
+ response.pop(CDS_IS_SUCCESSFUL_KEY)
+ logs = response.pop(RESULTS_LOG_KEY)
+
+ # Payload should only contains response data returned from the executed script and/or the error message
+ payload = json.dumps(response)
timestamp = Timestamp()
timestamp.GetCurrentTime()
- return CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
- response=response[RESULTS_LOG_KEY],
+ execution_output = CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
+ response=logs,
status=status,
payload=payload,
timestamp=timestamp)
-# build a ret data structure
-def build_ret_data(cds_is_successful, results={}, results_log=[], error=None):
+ return truncate_execution_output(execution_output)
+
+# build a ret data structure used to populate the ExecutionOutput
+def build_ret_data(cds_is_successful, results_log=[], error=None):
ret_data = {
- CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
- RESULTS_KEY: results,
- RESULTS_LOG_KEY: results_log
- }
+ CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
+ RESULTS_LOG_KEY: results_log
+ }
if error:
ret_data[ERR_MSG_KEY] = error
return ret_data
-def truncate_cmd_exec_logs(logs):
- truncated_logs = []
- truncated_logs_memsize = 0
- for log in logs:
- truncated_logs_memsize += asizeof.asizeof(log)
- if truncated_logs_memsize > TRUNC_MSG_LEN:
- truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.")
- break
- truncated_logs.append(log)
- return truncated_logs \ No newline at end of file
+# Truncate execution logs to make sure gRPC response doesn't exceed the gRPC buffer capacity
+def truncate_execution_output(execution_output):
+ sum_truncated_chars = 0
+ if execution_output.ByteSize() > RESPONSE_MAX_SIZE:
+ while execution_output.ByteSize() > RESPONSE_MAX_SIZE:
+ removed_item = execution_output.response.pop()
+ sum_truncated_chars += len(removed_item)
+ execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars))
+ return execution_output \ No newline at end of file