diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions/python-executor/src')
-rw-r--r-- | ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt | 117 |
1 files changed, 50 insertions, 67 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index e48016745..59c448407 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -75,6 +75,7 @@ open class ComponentRemotePythonExecutor( const val ATTRIBUTE_RESPONSE_DATA = "response-data" const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120 const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180 + const val TIMEOUT_DELTA = 100L } override suspend fun processNB(executionRequest: ExecutionServiceInput) { @@ -155,40 +156,37 @@ open class ComponentRemotePythonExecutor( val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput) log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}") val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response) - val logsEnv = logs.toString().asJsonPrimitive() - setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv) + setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logs) + // there are no artifacts for env. prepare, but we reuse it for err_log... if (prepareEnvOutput.status != StatusType.SUCCESS) { - val errorMessage = prepareEnvOutput.payload - setNodeOutputErrors(prepareEnvOutput.status.name, - STEP_PREPARE_ENV, - logs, - errorMessage, - isLogResponseEnabled - ) + setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), prepareEnvOutput.payload, isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, logs.toString()) } else { - setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), - STEP_PREPARE_ENV, - logsEnv, - "".asJsonPrimitive(), - isLogResponseEnabled - ) + setNodeOutputProperties(prepareEnvOutput.status, STEP_PREPARE_ENV, logs, prepareEnvOutput.payload, isLogResponseEnabled) } } else { // set env preparation log to empty... setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, "".asJsonPrimitive()) } + // in cases where the exception is caught in BP side due to timeout, we do not have `err_msg` returned by cmd-exec (inside `payload`), + // hence `artifact` field will be empty } catch (grpcEx: io.grpc.StatusRuntimeException) { val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else "" - val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}" + val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId).$componentLevelWarningMsg grpcError: (${grpcEx.cause?.message})" + // no execution log in case of timeout (as cmd-exec side hasn't finished to transfer output) + // set prepare-env-log to the error msg, and cmd-exec-log to empty setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive()) - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), "".asJsonPrimitive(), isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, grpcErrMsg) log.error(grpcErrMsg, grpcEx) } catch (e: Exception) { - val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}" - setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive()) - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) - log.error(timeoutErrMsg, e) + val catchallErrMsg = "Command executor failed during env. preparation.. catch-all case. timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}" + // no environment prepare log from executor in case of timeout (as cmd-exec side hasn't finished to transfer output), set it to error msg. Execution logs is empty. + setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, catchallErrMsg.asJsonPrimitive()) + setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), "".asJsonPrimitive(), isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, catchallErrMsg) + log.error(catchallErrMsg, e) } // if Env preparation was successful, then proceed with command execution in this Env if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) { @@ -207,7 +205,7 @@ open class ComponentRemotePythonExecutor( remoteScriptExecutionService.executeCommand(remoteExecutionInput) } - val remoteExecutionOutput = withTimeout(implementation.timeout * 1000L) { + val remoteExecutionOutput = withTimeout(executionTimeout * 1000L + TIMEOUT_DELTA) { remoteExecutionOutputDeferred.await() } @@ -215,44 +213,31 @@ open class ComponentRemotePythonExecutor( "Error: Request-id $processId did not return a result from remote command execution." } val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response) + val returnedPayload = remoteExecutionOutput.payload + // In case of execution, `payload` (dictionary from Python execution) is preserved in `remoteExecutionOutput.payload`; + // It would contain `err_msg` key. It is valid to return it. if (remoteExecutionOutput.status != StatusType.SUCCESS) { - setNodeOutputErrors(remoteExecutionOutput.status.name, - STEP_EXEC_CMD, - logs, - remoteExecutionOutput.payload, - isLogResponseEnabled - ) + setNodeOutputErrors(STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_EXEC_CMD, logs.toString()) } else { - setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), - STEP_EXEC_CMD, - logs, - remoteExecutionOutput.payload, - isLogResponseEnabled - ) - } + setNodeOutputProperties(remoteExecutionOutput.status, STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled) + } // In timeout exception cases, we don't have payload, hence `payload` is empty value. } catch (timeoutEx: TimeoutCancellationException) { val componentLevelWarningMsg = if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else "" val timeoutErrMsg = "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg" - setNodeOutputErrors(status = StatusType.FAILURE.name, - step = STEP_EXEC_CMD, - logs = "".asJsonPrimitive(), - message = timeoutErrMsg.asJsonPrimitive(), - logging = isLogResponseEnabled - ) + setNodeOutputErrors(STEP_EXEC_CMD, timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg) log.error(timeoutErrMsg, timeoutEx) } catch (grpcEx: io.grpc.StatusRuntimeException) { val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}" - setNodeOutputErrors(status = StatusType.FAILURE.name, - step = STEP_EXEC_CMD, - logs = "".asJsonPrimitive(), - message = timeoutErrMsg.asJsonPrimitive(), - logging = isLogResponseEnabled - ) + setNodeOutputErrors(STEP_EXEC_CMD, timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg) log.error(timeoutErrMsg, grpcEx) } catch (e: Exception) { - val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}" - setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) - log.error(timeoutErrMsg, e) + val catchAllErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}" + setNodeOutputErrors(STEP_PREPARE_ENV, catchAllErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled) + addError(StatusType.FAILURE.name, STEP_EXEC_CMD, catchAllErrMsg) + log.error(catchAllErrMsg, e) } } log.debug("Trying to close GRPC channel. request ($processId)") @@ -278,21 +263,21 @@ open class ComponentRemotePythonExecutor( * Utility function to set the output properties of the executor node */ private fun setNodeOutputProperties( - status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(), + status: StatusType, step: String, - logs: JsonNode, - message: JsonNode, + executionLogs: JsonNode, + artifacts: JsonNode, logging: Boolean = true ) { - setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status) - setAttribute(ATTRIBUTE_RESPONSE_DATA, message) - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs) + setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.name.asJsonPrimitive()) + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs) + setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) if (logging) { log.info("Executor status : $step : $status") - log.info("Executor message: $step : $message") - log.info("Executor logs : $step : $logs") + log.info("Executor logs : $step : $executionLogs") + log.info("Executor artifacts: $step : $artifacts") } } @@ -300,22 +285,20 @@ open class ComponentRemotePythonExecutor( * Utility function to set the output properties and errors of the executor node, in case of errors */ private fun setNodeOutputErrors( - status: String, step: String, - logs: JsonNode = "N/A".asJsonPrimitive(), - message: JsonNode, + executionLogs: JsonNode = "N/A".asJsonPrimitive(), + artifacts: JsonNode = "N/A".asJsonPrimitive(), logging: Boolean = true ) { + val status = StatusType.FAILURE.name setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive()) - setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs) - setAttribute(ATTRIBUTE_RESPONSE_DATA, message) + setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs) + setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) if (logging) { log.info("Executor status : $step : $status") - log.info("Executor message: $step : $message") - log.info("Executor logs : $step : $logs") + log.info("Executor logs : $step : $executionLogs") + log.info("Executor artifacts: $step : $artifacts") } - - addError(status, step, logs.toString()) } } |