summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt33
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt14
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt73
3 files changed, 74 insertions, 46 deletions
diff --git a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
index 6b1f186c9..3250cd3a2 100644
--- a/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
+++ b/ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
@@ -17,6 +17,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor
import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.async
+import kotlinx.coroutines.withTimeout
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant
@@ -128,17 +132,36 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
requestId = processId,
remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
command = scriptCommand,
- properties = properties)
- val remoteExecutionOutput = remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+ properties = properties,
+ timeOut = timeout.toLong())
+
+
+ val remoteExecutionOutputDeferred = GlobalScope.async {
+ remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+ }
+
+ val remoteExecutionOutput = withTimeout(timeout * 1000L) {
+ remoteExecutionOutputDeferred.await()
+ }
+
+ checkNotNull(remoteExecutionOutput) {
+ "Error: Request-id $processId did not return a restul from remote command execution."
+ }
val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
if (remoteExecutionOutput.status != StatusType.SUCCESS) {
- setNodeOutputErrors(remoteExecutionOutput.status.name,logs, remoteExecutionOutput.payload)
+ setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
} else {
setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
- remoteExecutionOutput.payload)
+ remoteExecutionOutput.payload)
}
+ } catch (timeoutEx: TimeoutCancellationException) {
+ setNodeOutputErrors(status = "Command executor timed out after $timeout seconds", message = "".asJsonPrimitive())
+ log.error("Command executor timed out after $timeout seconds", timeoutEx)
+ } catch (grpcEx: io.grpc.StatusRuntimeException) {
+ setNodeOutputErrors(status = "Command executor timed out in GRPC call", message = "${grpcEx.status}".asJsonPrimitive())
+ log.error("Command executor time out during GRPC call", grpcEx)
} catch (e: Exception) {
log.error("Failed to process on remote executor", e)
} finally {
@@ -176,7 +199,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
/**
* Utility function to set the output properties and errors of the executor node, in cas of errors
*/
- private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive() ) {
+ private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
index 5163a93ac..7a0f167fa 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
@@ -85,12 +85,13 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
check(operationName.isNotEmpty()) { "couldn't get Operation name for step($stepName)" }
val operationResolvedProperties = bluePrintRuntimeService
- .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+ .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
this.operationInputs.putAll(operationResolvedProperties)
val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT)
timeout?.let { this.timeout = timeout }
+ log.debug("DEBUG::: AbstractComponentFunction prepareRequestNB.timeout ($timeout)")
return executionRequest
}
@@ -99,11 +100,11 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
log.info("Preparing Response...")
executionServiceOutput.commonHeader = executionServiceInput.commonHeader
executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
- var status = Status()
+ val status = Status()
try {
// Resolve the Output Expression
val stepOutputs = bluePrintRuntimeService
- .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName)
+ .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName)
val stepOutputData = StepData().apply {
name = stepName
@@ -123,7 +124,8 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
try {
prepareRequestNB(executionServiceInput)
- withTimeout((timeout * 1000).toLong()) {
+ withTimeout(timeout * 1000L) {
+ log.debug("DEBUG::: AbstractComponentFunction.withTimeout section $timeout seconds")
processNB(executionServiceInput)
}
} catch (runtimeException: RuntimeException) {
@@ -135,7 +137,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
fun getOperationInput(key: String): JsonNode {
return operationInputs[key]
- ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.")
+ ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.")
}
fun getOptionalOperationInput(key: String): JsonNode? {
@@ -189,4 +191,4 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
return file.readNBLines()
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt
index d6146e111..5801af95b 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt
@@ -32,6 +32,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Scope
import org.springframework.stereotype.Service
+import java.util.concurrent.TimeUnit
interface RemoteScriptExecutionService {
suspend fun init(selector: Any)
@@ -42,7 +43,7 @@ interface RemoteScriptExecutionService {
@Service(ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION)
@ConditionalOnProperty(prefix = "blueprintprocessor.remoteScriptCommand", name = arrayOf("enabled"),
- havingValue = "true", matchIfMissing = false)
+ havingValue = "true", matchIfMissing = false)
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
: RemoteScriptExecutionService {
@@ -54,12 +55,12 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
override suspend fun init(selector: Any) {
// Get the GRPC Client Service based on selector
- val grpcClientService: BluePrintGrpcClientService
- if (selector is JsonNode) {
- grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector)
+ val grpcClientService: BluePrintGrpcClientService = if (selector is JsonNode) {
+ bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector)
} else {
- grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString())
+ bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString())
}
+
// Get the GRPC Channel
channel = grpcClientService.channel()
// Create Non Blocking Stub
@@ -70,9 +71,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
}
}
- override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput)
- : RemoteScriptExecutionOutput {
- val grpResponse = commandExecutorServiceGrpc.prepareEnv(prepareEnvInput.asGrpcData())
+ override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput): RemoteScriptExecutionOutput {
+ val grpResponse = commandExecutorServiceGrpc
+ .withDeadlineAfter(prepareEnvInput.timeOut * 1000, TimeUnit.MILLISECONDS)
+ .prepareEnv(prepareEnvInput.asGrpcData())
checkNotNull(grpResponse.status) {
"failed to get GRPC prepare env response status for requestId(${prepareEnvInput.requestId})"
@@ -85,18 +87,19 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
}
override suspend fun executeCommand(remoteExecutionInput: RemoteScriptExecutionInput)
- : RemoteScriptExecutionOutput {
-
- val grpResponse = commandExecutorServiceGrpc.executeCommand(remoteExecutionInput.asGrpcData())
+ : RemoteScriptExecutionOutput {
+ val grpResponse =
+ commandExecutorServiceGrpc
+ .withDeadlineAfter(remoteExecutionInput.timeOut * 1000, TimeUnit.MILLISECONDS)
+ .executeCommand(remoteExecutionInput.asGrpcData())
checkNotNull(grpResponse.status) {
"failed to get GRPC response status for requestId(${remoteExecutionInput.requestId})"
}
- val remoteScriptExecutionOutput = grpResponse.asJavaData()
log.debug("Received response from command server for requestId(${remoteExecutionInput.requestId})")
+ return grpResponse.asJavaData()
- return remoteScriptExecutionOutput
}
override suspend fun close() {
@@ -116,33 +119,33 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
}
return PrepareEnvInput.newBuilder()
- .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
- .setRequestId(this.requestId)
- .setCorrelationId(correlationId)
- .setTimeOut(this.timeOut.toInt())
- .addAllPackages(packageList)
- .setProperties(this.properties.asGrpcData())
- .build()
+ .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
+ .setRequestId(this.requestId)
+ .setCorrelationId(correlationId)
+ .setTimeOut(this.timeOut.toInt())
+ .addAllPackages(packageList)
+ .setProperties(this.properties.asGrpcData())
+ .build()
}
fun RemoteScriptExecutionInput.asGrpcData(): ExecutionInput {
val correlationId = this.correlationId ?: this.requestId
return ExecutionInput.newBuilder()
- .setRequestId(this.requestId)
- .setCorrelationId(correlationId)
- .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
- .setCommand(this.command)
- .setTimeOut(this.timeOut.toInt())
- .setProperties(this.properties.asGrpcData())
- .setTimestamp(Timestamp.getDefaultInstance())
- .build()
+ .setRequestId(this.requestId)
+ .setCorrelationId(correlationId)
+ .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
+ .setCommand(this.command)
+ .setTimeOut(this.timeOut.toInt())
+ .setProperties(this.properties.asGrpcData())
+ .setTimestamp(Timestamp.getDefaultInstance())
+ .build()
}
fun RemoteIdentifier.asGrpcData(): Identifiers? {
return Identifiers.newBuilder()
- .setBlueprintName(this.blueprintName)
- .setBlueprintVersion(this.blueprintVersion)
- .build()
+ .setBlueprintName(this.blueprintName)
+ .setBlueprintVersion(this.blueprintVersion)
+ .build()
}
fun Map<String, JsonNode>.asGrpcData(): Struct {
@@ -153,10 +156,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
fun ExecutionOutput.asJavaData(): RemoteScriptExecutionOutput {
return RemoteScriptExecutionOutput(
- requestId = this.requestId,
- response = this.responseList,
- status = StatusType.valueOf(this.status.name),
- payload = payload.jsonAsJsonType()
+ requestId = this.requestId,
+ response = this.responseList,
+ status = StatusType.valueOf(this.status.name),
+ payload = payload.jsonAsJsonType()
)
}