summaryrefslogtreecommitdiffstats
path: root/ms/command-executor/src/main/python
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-04-20 11:53:41 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-05-05 18:17:52 -0400
commit888f2f78a580a3deccb66bef50f2375a04b39eb6 (patch)
tree2b76202c7a7f9f73cb36a1bc7d38496d939de9e3 /ms/command-executor/src/main/python
parentc86d7262d03b299dc635b3ba68e4a7c2c0fd6a6e (diff)
Truncate message published on Kafka / Spike: Define solution for logs separation
Refactoring of cmd-exec component - Improve display of error messages within the response - Fix gRPC buffer limitation (4Mb) by truncating error messages and cmd-exec logs if too heavy (>3Mb) Truncation of BP responses (<4Kb) before sending them in kafka audit topics. - Truncation if needed of error messages for every response - Truncation of cmd-exec logs in cmd-exec responses (Spike) Add a flag in the application.properties to enable/disable the display of cmd-exec responses on the BP side (Fix) Correction of BP processing with kafka regression (Fix) Changed default SSL Endpoint Algo Issue-ID: CCSDK-2326 Change-Id: If4d0e661117d1dd156cf19c95774824e754d870a Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Diffstat (limited to 'ms/command-executor/src/main/python')
-rw-r--r--ms/command-executor/src/main/python/command_executor_handler.py72
-rw-r--r--ms/command-executor/src/main/python/command_executor_server.py31
-rw-r--r--ms/command-executor/src/main/python/utils.py52
3 files changed, 91 insertions, 64 deletions
diff --git a/ms/command-executor/src/main/python/command_executor_handler.py b/ms/command-executor/src/main/python/command_executor_handler.py
index 1e6f03b81..0c476b23e 100644
--- a/ms/command-executor/src/main/python/command_executor_handler.py
+++ b/ms/command-executor/src/main/python/command_executor_handler.py
@@ -43,43 +43,48 @@ class CommandExecutorHandler():
def is_installed(self):
return os.path.exists(self.installed)
- def prepare_env(self, request, results):
+ def prepare_env(self, request):
+ results_log = []
if not self.is_installed():
create_venv_status = self.create_venv()
- if not create_venv_status["cds_is_successful"]:
- err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status["err_msg"])
+ if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+ err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY])
self.logger.error(err_msg)
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
activate_venv_status = self.activate_venv()
- if not activate_venv_status["cds_is_successful"]:
- err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status["err_msg"])
+ if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+ err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status[utils.ERR_MSG_KEY])
self.logger.error(err_msg)
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
try:
with open(self.installed, "w+") as f:
- if not self.install_packages(request, CommandExecutor_pb2.pip, f, results):
- return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id))
+ if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log):
+ err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id)
+ return utils.build_ret_data(False, results_log=results_log, error=err_msg)
f.write("\r\n") # TODO: is \r needed?
- results.append("\n")
- if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results):
- return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id))
+ results_log.append("\n")
+ if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log):
+ err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id)
+ return utils.build_ret_data(False, results_log=results_log, error=err_msg)
except Exception as ex:
err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_id, ex)
self.logger.error(err_msg)
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
else:
try:
with open(self.installed, "r") as f:
- results.append(f.read())
+ results_log.append(f.read())
except Exception as ex:
- return utils.build_ret_data(False, "ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex))
+ err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)
+ return utils.build_ret_data(False, error=err_msg)
# deactivate_venv(blueprint_id)
- return utils.build_ret_data(True, "")
+ return utils.build_ret_data(True, results_log=results_log)
- def execute_command(self, request, results):
- payload_result = {}
+ def execute_command(self, request):
+ results_log = []
+ result = {}
# workaround for when packages are not specified, we may not want to go through the install step
# can just call create_venv from here.
if not self.is_installed():
@@ -87,16 +92,14 @@ class CommandExecutorHandler():
try:
if not self.is_installed():
create_venv_status = self.create_venv
- if not create_venv_status["cds_is_successful"]:
- err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status["err_msg"])
- results.append(err_msg)
- return utils.build_ret_data(False, err_msg)
+ if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+ err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY])
+ return utils.build_ret_data(False, error=err_msg)
activate_response = self.activate_venv()
- if not activate_response["cds_is_successful"]:
- orig_error = activate_response["err_msg"]
+ if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]:
+ orig_error = activate_response[utils.ERR_MSG_KEY]
err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_id, orig_error)
- results.append(err_msg) #TODO: get rid of results and just rely on the return data struct.
- return utils.build_ret_data(False, err_msg)
+ return utils.build_ret_data(False, error=err_msg)
cmd = "cd " + self.venv_home
@@ -131,26 +134,25 @@ class CommandExecutorHandler():
payload = '\n'.join(payload_section)
msg = email.parser.Parser().parsestr(payload)
for part in msg.get_payload():
- payload_result = json.loads(part.get_payload())
+ result = json.loads(part.get_payload())
if output and not is_payload_section:
self.logger.info(output.strip())
- results.append(output.strip())
+ results_log.append(output.strip())
else:
payload_section.append(output.strip())
rc = newProcess.poll()
except Exception as e:
err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e)
- self.logger.info(err_msg)
- results.append(e)
- payload_result.update(utils.build_ret_data(False, err_msg))
- return payload_result
+ return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
# deactivate_venv(blueprint_id)
#Since return code is only used to check if it's zero (success), we can just return success flag instead.
self.logger.debug("python return_code : {}".format(rc))
- is_execution_successful = rc == 0
- payload_result.update(utils.build_ret_data(is_execution_successful, ""))
- return payload_result
+ if rc == 0:
+ return utils.build_ret_data(True, results=result, results_log=results_log)
+ else:
+ err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id)
+ return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
def install_packages(self, request, type, f, results):
success = self.install_python_packages('UTILITY', results)
diff --git a/ms/command-executor/src/main/python/command_executor_server.py b/ms/command-executor/src/main/python/command_executor_server.py
index 3435e2272..207097605 100644
--- a/ms/command-executor/src/main/python/command_executor_server.py
+++ b/ms/command-executor/src/main/python/command_executor_server.py
@@ -22,9 +22,6 @@ import proto.CommandExecutor_pb2_grpc as CommandExecutor_pb2_grpc
from command_executor_handler import CommandExecutorHandler
import utils
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServicer):
def __init__(self):
@@ -35,14 +32,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
self.logger.info("{} - Received prepareEnv request".format(blueprint_id))
self.logger.info(request)
- results = []
handler = CommandExecutorHandler(request)
- prepare_env_response = handler.prepare_env(request, results)
- if not prepare_env_response["cds_is_successful"]:
- self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, results))
- return utils.build_grpc_response(request, results, {}, False)
- self.logger.info("{} - Package installation logs {}".format(blueprint_id, results))
- return utils.build_grpc_response(request, results, {}, True)
+ prepare_env_response = handler.prepare_env(request)
+ if prepare_env_response[utils.CDS_IS_SUCCESSFUL_KEY]:
+ self.logger.info("{} - Package installation logs {}".format(blueprint_id, prepare_env_response[utils.RESULTS_LOG_KEY]))
+ else:
+ self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, prepare_env_response[utils.ERR_MSG_KEY]))
+ self.logger.info("Prepare Env Response returned : %s" % prepare_env_response)
+ return utils.build_grpc_response(request.requestId, prepare_env_response)
def executeCommand(self, request, context):
blueprint_id = utils.get_blueprint_id(request)
@@ -53,13 +50,15 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
log_results = []
payload_result = {}
handler = CommandExecutorHandler(request)
- payload_result = handler.execute_command(request, log_results)
- if not payload_result["cds_is_successful"]:
- self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, log_results))
- else:
+ exec_cmd_response = handler.execute_command(request)
+ if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]:
self.logger.info("{} - Execution finished successfully.".format(blueprint_id))
+ self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
+ self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY]))
+ else:
+ self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY]))
- ret = utils.build_grpc_response(request, log_results, payload_result, payload_result["cds_is_successful"])
- self.logger.info("Payload returned %s" % payload_result)
+ ret = utils.build_grpc_response(request.requestId, exec_cmd_response)
+ self.logger.info("Response returned : {}".format(exec_cmd_response))
return ret \ No newline at end of file
diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py
index 574be51db..b98241629 100644
--- a/ms/command-executor/src/main/python/utils.py
+++ b/ms/command-executor/src/main/python/utils.py
@@ -17,7 +17,13 @@ from google.protobuf.timestamp_pb2 import Timestamp
import proto.CommandExecutor_pb2 as CommandExecutor_pb2
import json
+from pympler import asizeof
+CDS_IS_SUCCESSFUL_KEY = "cds_is_successful"
+ERR_MSG_KEY = "err_msg"
+RESULTS_KEY = "results"
+RESULTS_LOG_KEY = "results_log"
+TRUNC_MSG_LEN = 3 * 1024 * 1024
def get_blueprint_id(request):
blueprint_name = request.identifiers.blueprintName
@@ -25,27 +31,47 @@ def get_blueprint_id(request):
return blueprint_name + '/' + blueprint_version
# Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element
-def build_grpc_response(request, log_results, payload_return, is_success=False):
- if is_success:
+def build_grpc_response(request_id, response):
+ if response[CDS_IS_SUCCESSFUL_KEY]:
status = CommandExecutor_pb2.SUCCESS
+ payload = json.dumps(response[RESULTS_KEY])
else:
status = CommandExecutor_pb2.FAILURE
+ # truncate error message if too heavy
+ if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN:
+ response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN])
+ payload = json.dumps(response[ERR_MSG_KEY])
+
+ # truncate cmd-exec logs if too heavy
+ response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY])
timestamp = Timestamp()
timestamp.GetCurrentTime()
- if "cds_is_successful" in payload_return:
- payload_return.pop('cds_is_successful')
- payload_str = json.dumps(payload_return)
- return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId,
- response=log_results,
+ return CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
+ response=response[RESULTS_LOG_KEY],
status=status,
- payload=payload_str,
+ payload=payload,
timestamp=timestamp)
-# build a return data structure which may contain an error message
-def build_ret_data(cds_is_successful, err_msg):
- ret_data = {"cds_is_successful": cds_is_successful }
- if err_msg != "":
- ret_data["err_msg"] = err_msg
+# build a ret data structure
+def build_ret_data(cds_is_successful, results={}, results_log=[], error=None):
+ ret_data = {
+ CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
+ RESULTS_KEY: results,
+ RESULTS_LOG_KEY: results_log
+ }
+ if error:
+ ret_data[ERR_MSG_KEY] = error
return ret_data
+
+def truncate_cmd_exec_logs(logs):
+ truncated_logs = []
+ truncated_logs_memsize = 0
+ for log in logs:
+ truncated_logs_memsize += asizeof.asizeof(log)
+ if truncated_logs_memsize > TRUNC_MSG_LEN:
+ truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.")
+ break
+ truncated_logs.append(log)
+ return truncated_logs \ No newline at end of file