diff options
3 files changed, 74 insertions, 46 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt index 6b1f186c9..3250cd3a2 100644 --- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt +++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt @@ -17,6 +17,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.async +import kotlinx.coroutines.withTimeout import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.* import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant @@ -128,17 +132,36 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic requestId = processId, remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion), command = scriptCommand, - properties = properties) - val remoteExecutionOutput = remoteScriptExecutionService.executeCommand(remoteExecutionInput) + properties = properties, + timeOut = timeout.toLong()) + + + val remoteExecutionOutputDeferred = GlobalScope.async { + remoteScriptExecutionService.executeCommand(remoteExecutionInput) + } + + val remoteExecutionOutput = withTimeout(timeout * 1000L) { + remoteExecutionOutputDeferred.await() + } + + checkNotNull(remoteExecutionOutput) { + "Error: Request-id $processId did not return a restul from remote command execution." + } val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response) if (remoteExecutionOutput.status != StatusType.SUCCESS) { - setNodeOutputErrors(remoteExecutionOutput.status.name,logs, remoteExecutionOutput.payload) + setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload) } else { setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs, - remoteExecutionOutput.payload) + remoteExecutionOutput.payload) } + } catch (timeoutEx: TimeoutCancellationException) { + setNodeOutputErrors(status = "Command executor timed out after $timeout seconds", message = "".asJsonPrimitive()) + log.error("Command executor timed out after $timeout seconds", timeoutEx) + } catch (grpcEx: io.grpc.StatusRuntimeException) { + setNodeOutputErrors(status = "Command executor timed out in GRPC call", message = "${grpcEx.status}".asJsonPrimitive()) + log.error("Command executor time out during GRPC call", grpcEx) } catch (e: Exception) { log.error("Failed to process on remote executor", e) } finally { @@ -176,7 +199,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic /** * Utility function to set the output properties and errors of the executor node, in cas of errors */ - private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive() ) { + private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) { setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive()) setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message) setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts) diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 5163a93ac..7a0f167fa 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -85,12 +85,13 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic check(operationName.isNotEmpty()) { "couldn't get Operation name for step($stepName)" } val operationResolvedProperties = bluePrintRuntimeService - .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) this.operationInputs.putAll(operationResolvedProperties) val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT) timeout?.let { this.timeout = timeout } + log.debug("DEBUG::: AbstractComponentFunction prepareRequestNB.timeout ($timeout)") return executionRequest } @@ -99,11 +100,11 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic log.info("Preparing Response...") executionServiceOutput.commonHeader = executionServiceInput.commonHeader executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers - var status = Status() + val status = Status() try { // Resolve the Output Expression val stepOutputs = bluePrintRuntimeService - .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName) + .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName) val stepOutputData = StepData().apply { name = stepName @@ -123,7 +124,8 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { try { prepareRequestNB(executionServiceInput) - withTimeout((timeout * 1000).toLong()) { + withTimeout(timeout * 1000L) { + log.debug("DEBUG::: AbstractComponentFunction.withTimeout section $timeout seconds") processNB(executionServiceInput) } } catch (runtimeException: RuntimeException) { @@ -135,7 +137,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic fun getOperationInput(key: String): JsonNode { return operationInputs[key] - ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.") + ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.") } fun getOptionalOperationInput(key: String): JsonNode? { @@ -189,4 +191,4 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic return file.readNBLines() } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt index d6146e111..5801af95b 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt @@ -32,6 +32,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Scope import org.springframework.stereotype.Service +import java.util.concurrent.TimeUnit interface RemoteScriptExecutionService { suspend fun init(selector: Any) @@ -42,7 +43,7 @@ interface RemoteScriptExecutionService { @Service(ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION) @ConditionalOnProperty(prefix = "blueprintprocessor.remoteScriptCommand", name = arrayOf("enabled"), - havingValue = "true", matchIfMissing = false) + havingValue = "true", matchIfMissing = false) @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) : RemoteScriptExecutionService { @@ -54,12 +55,12 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi override suspend fun init(selector: Any) { // Get the GRPC Client Service based on selector - val grpcClientService: BluePrintGrpcClientService - if (selector is JsonNode) { - grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector) + val grpcClientService: BluePrintGrpcClientService = if (selector is JsonNode) { + bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector) } else { - grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString()) + bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString()) } + // Get the GRPC Channel channel = grpcClientService.channel() // Create Non Blocking Stub @@ -70,9 +71,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi } } - override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput) - : RemoteScriptExecutionOutput { - val grpResponse = commandExecutorServiceGrpc.prepareEnv(prepareEnvInput.asGrpcData()) + override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput): RemoteScriptExecutionOutput { + val grpResponse = commandExecutorServiceGrpc + .withDeadlineAfter(prepareEnvInput.timeOut * 1000, TimeUnit.MILLISECONDS) + .prepareEnv(prepareEnvInput.asGrpcData()) checkNotNull(grpResponse.status) { "failed to get GRPC prepare env response status for requestId(${prepareEnvInput.requestId})" @@ -85,18 +87,19 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi } override suspend fun executeCommand(remoteExecutionInput: RemoteScriptExecutionInput) - : RemoteScriptExecutionOutput { - - val grpResponse = commandExecutorServiceGrpc.executeCommand(remoteExecutionInput.asGrpcData()) + : RemoteScriptExecutionOutput { + val grpResponse = + commandExecutorServiceGrpc + .withDeadlineAfter(remoteExecutionInput.timeOut * 1000, TimeUnit.MILLISECONDS) + .executeCommand(remoteExecutionInput.asGrpcData()) checkNotNull(grpResponse.status) { "failed to get GRPC response status for requestId(${remoteExecutionInput.requestId})" } - val remoteScriptExecutionOutput = grpResponse.asJavaData() log.debug("Received response from command server for requestId(${remoteExecutionInput.requestId})") + return grpResponse.asJavaData() - return remoteScriptExecutionOutput } override suspend fun close() { @@ -116,33 +119,33 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi } return PrepareEnvInput.newBuilder() - .setIdentifiers(this.remoteIdentifier!!.asGrpcData()) - .setRequestId(this.requestId) - .setCorrelationId(correlationId) - .setTimeOut(this.timeOut.toInt()) - .addAllPackages(packageList) - .setProperties(this.properties.asGrpcData()) - .build() + .setIdentifiers(this.remoteIdentifier!!.asGrpcData()) + .setRequestId(this.requestId) + .setCorrelationId(correlationId) + .setTimeOut(this.timeOut.toInt()) + .addAllPackages(packageList) + .setProperties(this.properties.asGrpcData()) + .build() } fun RemoteScriptExecutionInput.asGrpcData(): ExecutionInput { val correlationId = this.correlationId ?: this.requestId return ExecutionInput.newBuilder() - .setRequestId(this.requestId) - .setCorrelationId(correlationId) - .setIdentifiers(this.remoteIdentifier!!.asGrpcData()) - .setCommand(this.command) - .setTimeOut(this.timeOut.toInt()) - .setProperties(this.properties.asGrpcData()) - .setTimestamp(Timestamp.getDefaultInstance()) - .build() + .setRequestId(this.requestId) + .setCorrelationId(correlationId) + .setIdentifiers(this.remoteIdentifier!!.asGrpcData()) + .setCommand(this.command) + .setTimeOut(this.timeOut.toInt()) + .setProperties(this.properties.asGrpcData()) + .setTimestamp(Timestamp.getDefaultInstance()) + .build() } fun RemoteIdentifier.asGrpcData(): Identifiers? { return Identifiers.newBuilder() - .setBlueprintName(this.blueprintName) - .setBlueprintVersion(this.blueprintVersion) - .build() + .setBlueprintName(this.blueprintName) + .setBlueprintVersion(this.blueprintVersion) + .build() } fun Map<String, JsonNode>.asGrpcData(): Struct { @@ -153,10 +156,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi fun ExecutionOutput.asJavaData(): RemoteScriptExecutionOutput { return RemoteScriptExecutionOutput( - requestId = this.requestId, - response = this.responseList, - status = StatusType.valueOf(this.status.name), - payload = payload.jsonAsJsonType() + requestId = this.requestId, + response = this.responseList, + status = StatusType.valueOf(this.status.name), + payload = payload.jsonAsJsonType() ) } |