diff options
author | Dan Timoney <dtimoney@att.com> | 2019-08-22 22:02:40 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-08-22 22:02:40 +0000 |
commit | 9f260f36b66d0db54da89a8f9307d50fd0d4f320 (patch) | |
tree | 7d0ef6cd07ffc8612b3b9db907d649b8bd58f443 /ms/blueprintsprocessor | |
parent | b496e93839fba89a8ea2ab026954a6f6359bc4c2 (diff) | |
parent | 43957fac4ead89a43ef97bf749b1bddc1adf0d6c (diff) |
Merge "Component node timeout implementation"
Diffstat (limited to 'ms/blueprintsprocessor')
4 files changed, 73 insertions, 26 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt new file mode 100644 index 000000000..47b55b018 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import kotlin.reflect.KClass + + +fun <T : Any> ExecutionServiceInput.payloadAsType(clazzType: KClass<T>): T { + val actionName = this.actionIdentifiers.actionName + val requestJsonNode = this.payload.get("$actionName-request") + return requestJsonNode.asType(clazzType.java) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 408bb58ed..8759338b7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.withTimeout import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status @@ -47,6 +48,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic lateinit var interfaceName: String lateinit var operationName: String lateinit var nodeTemplateName: String + var timeout: Int = 180 var operationInputs: MutableMap<String, JsonNode> = hashMapOf() override fun getName(): String { @@ -87,6 +89,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic this.operationInputs.putAll(operationResolvedProperties) + val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT) + timeout?.let { this.timeout = timeout } + return executionRequest } @@ -118,7 +123,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { try { prepareRequestNB(executionServiceInput) - processNB(executionServiceInput) + withTimeout((timeout * 1000).toLong()) { + processNB(executionServiceInput) + } } catch (runtimeException: RuntimeException) { log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException) recoverNB(runtimeException, executionServiceInput) diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index 2a14be216..6bee17f4b 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -44,10 +44,8 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, - executionServiceInput, deferredOutput) - return deferredOutput.await() + return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput) } } @@ -60,35 +58,41 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput lateinit var workflowName: String - lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput> override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: ExecutionServiceInput, - output: CompletableDeferred<ExecutionServiceOutput>) { + input: ExecutionServiceInput): ExecutionServiceOutput { this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input this.workflowName = this.executionServiceInput.actionIdentifiers.actionName - this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<ExecutionServiceOutput>() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor.send(startMessage) + } else { + throw BluePrintProcessorException("workflow($workflowActor) actor is closed") + } + return output.await() } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { - val wfStatus = if (exception != null) { - val status = Status() - status.message = BluePrintConstants.STATUS_FAILURE - status.errorMessage = exception.message - status - } else { - val status = Status() - status.message = BluePrintConstants.STATUS_SUCCESS - status + override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + val wfStatus = Status().apply { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + val errorMessage = it.message ?: "" + bluePrintRuntimeService.getBluePrintError().addError(errorMessage) + log.error("workflow($workflowId) exception :", it) + } + message = BluePrintConstants.STATUS_FAILURE + } else { + message = BluePrintConstants.STATUS_SUCCESS + } } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt index 89732e300..b64177aab 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt @@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.putJsonElement +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService import org.slf4j.LoggerFactory @@ -37,15 +37,22 @@ open class NodeTemplateExecutionService { executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { // Get the Blueprint Context val blueprintContext = bluePrintRuntimeService.bluePrintContext() + + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) // Get the Component Name, NodeTemplate type is mapped to Component Name - val componentName = blueprintContext.nodeTemplateByName(nodeTemplateName).type + val componentName = nodeTemplate.type val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + val nodeTemplateImplementation = blueprintContext + .nodeTemplateOperationImplementation(nodeTemplateName, interfaceName, operationName) + + val timeout: Int = nodeTemplateImplementation?.timeout ?: 180 + log.info("executing node template($nodeTemplateName) component($componentName) " + - "interface($interfaceName) operation($operationName)") + "interface($interfaceName) operation($operationName) with timeout($timeout) sec.") // Get the Component Instance val plugin = BluePrintDependencyService.instance<AbstractComponentFunction>(componentName) @@ -62,9 +69,10 @@ open class NodeTemplateExecutionService { // Populate Step Meta Data val stepInputs: MutableMap<String, JsonNode> = hashMapOf() - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE, nodeTemplateName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_INTERFACE, interfaceName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_OPERATION, operationName) + stepInputs[BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE] = nodeTemplateName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_INTERFACE] = interfaceName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_OPERATION] = operationName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_TIMEOUT] = timeout.asJsonPrimitive() val stepInputData = StepData().apply { name = nodeTemplateName properties = stepInputs |