diff options
Diffstat (limited to 'ms/command-executor/src/main/python/utils.py')
-rw-r--r-- | ms/command-executor/src/main/python/utils.py | 80 |
1 files changed, 67 insertions, 13 deletions
diff --git a/ms/command-executor/src/main/python/utils.py b/ms/command-executor/src/main/python/utils.py index 574be51db..54c5ceafa 100644 --- a/ms/command-executor/src/main/python/utils.py +++ b/ms/command-executor/src/main/python/utils.py @@ -17,35 +17,89 @@ from google.protobuf.timestamp_pb2 import Timestamp import proto.CommandExecutor_pb2 as CommandExecutor_pb2 import json +import email.parser +CDS_IS_SUCCESSFUL_KEY = "cds_is_successful" +ERR_MSG_KEY = "err_msg" +RESULTS_KEY = "results" +RESULTS_LOG_KEY = "results_log" +RESPONSE_MAX_SIZE = 4 * 1024 * 1024 # 4Mb def get_blueprint_id(request): blueprint_name = request.identifiers.blueprintName blueprint_version = request.identifiers.blueprintVersion return blueprint_name + '/' + blueprint_version +def get_blueprint_timeout(request): + return request.timeOut + # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element -def build_grpc_response(request, log_results, payload_return, is_success=False): - if is_success: +def build_grpc_response(request_id, response): + if response[CDS_IS_SUCCESSFUL_KEY]: status = CommandExecutor_pb2.SUCCESS else: status = CommandExecutor_pb2.FAILURE + response.pop(CDS_IS_SUCCESSFUL_KEY) + logs = response.pop(RESULTS_LOG_KEY) + + # Payload should only contains response data returned from the executed script and/or the error message + payload = json.dumps(response) + timestamp = Timestamp() timestamp.GetCurrentTime() - if "cds_is_successful" in payload_return: - payload_return.pop('cds_is_successful') - payload_str = json.dumps(payload_return) - return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId, - response=log_results, + execution_output = CommandExecutor_pb2.ExecutionOutput(requestId=request_id, + response=logs, status=status, - payload=payload_str, + payload=payload, timestamp=timestamp) -# build a return data structure which may contain an error message -def build_ret_data(cds_is_successful, err_msg): - ret_data = {"cds_is_successful": cds_is_successful } - if err_msg != "": - ret_data["err_msg"] = err_msg + return truncate_execution_output(execution_output) + +# build a ret data structure used to populate the ExecutionOutput +def build_ret_data(cds_is_successful, results_log=[], error=None): + ret_data = { + CDS_IS_SUCCESSFUL_KEY: cds_is_successful, + RESULTS_LOG_KEY: results_log + } + if error: + ret_data[ERR_MSG_KEY] = error return ret_data + +# Truncate execution logs to make sure gRPC response doesn't exceed the gRPC buffer capacity +def truncate_execution_output(execution_output): + sum_truncated_chars = 0 + if execution_output.ByteSize() > RESPONSE_MAX_SIZE: + while execution_output.ByteSize() > RESPONSE_MAX_SIZE: + removed_item = execution_output.response.pop() + sum_truncated_chars += len(removed_item) + execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars)) + return execution_output + + +# Read temp file 'outputfile' into results_log and split out the returned payload into payload_result +def parse_cmd_exec_output(outputfile, logger, payload_result, results_log): + payload_section = [] + is_payload_section = False + outputfile.seek(0) + while True: + output = outputfile.readline() + if output == '': + break + if output.startswith('BEGIN_EXTRA_PAYLOAD'): + is_payload_section = True + output = outputfile.readline() + if output.startswith('END_EXTRA_PAYLOAD'): + is_payload_section = False + output = '' + payload = '\n'.join(payload_section) + msg = email.parser.Parser().parsestr(payload) + for part in msg.get_payload(): + payload_result.update(json.loads(part.get_payload())) + if output and not is_payload_section: + logger.info(output.strip()) + results_log.append(output.strip()) + else: + payload_section.append(output.strip()) + |