From 43957fac4ead89a43ef97bf749b1bddc1adf0d6c Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Fri, 16 Aug 2019 10:14:25 -0400 Subject: Component node timeout implementation Change-Id: I99fc9efba76595693d95772e409f0f982aeae9b1 Issue-ID: CCSDK-1619 Signed-off-by: Brinda Santh --- .../execution/AbstractComponentFunction.kt | 9 ++++- .../workflow/ImperativeWorkflowExecutionService.kt | 42 ++++++++++++---------- .../workflow/NodeTemplateExecutionService.kt | 20 +++++++---- 3 files changed, 45 insertions(+), 26 deletions(-) (limited to 'ms/blueprintsprocessor/modules/services') diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 408bb58ed..8759338b7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.withTimeout import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status @@ -47,6 +48,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode = hashMapOf() override fun getName(): String { @@ -87,6 +89,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode() - imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, - executionServiceInput, deferredOutput) - return deferredOutput.await() + return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput) } } @@ -60,35 +58,41 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput lateinit var workflowName: String - lateinit var deferredExecutionServiceOutput: CompletableDeferred override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: ExecutionServiceInput, - output: CompletableDeferred) { + input: ExecutionServiceInput): ExecutionServiceOutput { this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input this.workflowName = this.executionServiceInput.actionIdentifiers.actionName - this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor.send(startMessage) + } else { + throw BluePrintProcessorException("workflow($workflowActor) actor is closed") + } + return output.await() } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { - val wfStatus = if (exception != null) { - val status = Status() - status.message = BluePrintConstants.STATUS_FAILURE - status.errorMessage = exception.message - status - } else { - val status = Status() - status.message = BluePrintConstants.STATUS_SUCCESS - status + override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + val wfStatus = Status().apply { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + val errorMessage = it.message ?: "" + bluePrintRuntimeService.getBluePrintError().addError(errorMessage) + log.error("workflow($workflowId) exception :", it) + } + message = BluePrintConstants.STATUS_FAILURE + } else { + message = BluePrintConstants.STATUS_SUCCESS + } } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt index 89732e300..b64177aab 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt @@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.putJsonElement +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService import org.slf4j.LoggerFactory @@ -37,15 +37,22 @@ open class NodeTemplateExecutionService { executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { // Get the Blueprint Context val blueprintContext = bluePrintRuntimeService.bluePrintContext() + + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) // Get the Component Name, NodeTemplate type is mapped to Component Name - val componentName = blueprintContext.nodeTemplateByName(nodeTemplateName).type + val componentName = nodeTemplate.type val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + val nodeTemplateImplementation = blueprintContext + .nodeTemplateOperationImplementation(nodeTemplateName, interfaceName, operationName) + + val timeout: Int = nodeTemplateImplementation?.timeout ?: 180 + log.info("executing node template($nodeTemplateName) component($componentName) " + - "interface($interfaceName) operation($operationName)") + "interface($interfaceName) operation($operationName) with timeout($timeout) sec.") // Get the Component Instance val plugin = BluePrintDependencyService.instance(componentName) @@ -62,9 +69,10 @@ open class NodeTemplateExecutionService { // Populate Step Meta Data val stepInputs: MutableMap = hashMapOf() - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE, nodeTemplateName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_INTERFACE, interfaceName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_OPERATION, operationName) + stepInputs[BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE] = nodeTemplateName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_INTERFACE] = interfaceName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_OPERATION] = operationName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_TIMEOUT] = timeout.asJsonPrimitive() val stepInputData = StepData().apply { name = nodeTemplateName properties = stepInputs -- cgit 1.2.3-korg