diff options
author | Dan Timoney <dtimoney@att.com> | 2019-08-22 22:02:40 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-08-22 22:02:40 +0000 |
commit | 9f260f36b66d0db54da89a8f9307d50fd0d4f320 (patch) | |
tree | 7d0ef6cd07ffc8612b3b9db907d649b8bd58f443 | |
parent | b496e93839fba89a8ea2ab026954a6f6359bc4c2 (diff) | |
parent | 43957fac4ead89a43ef97bf749b1bddc1adf0d6c (diff) |
Merge "Component node timeout implementation"
10 files changed, 235 insertions, 86 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt new file mode 100644 index 000000000..47b55b018 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt @@ -0,0 +1,28 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.core.asType +import kotlin.reflect.KClass + + +fun <T : Any> ExecutionServiceInput.payloadAsType(clazzType: KClass<T>): T { + val actionName = this.actionIdentifiers.actionName + val requestJsonNode = this.payload.get("$actionName-request") + return requestJsonNode.asType(clazzType.java) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 408bb58ed..8759338b7 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution import com.fasterxml.jackson.databind.JsonNode +import kotlinx.coroutines.withTimeout import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status @@ -47,6 +48,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic lateinit var interfaceName: String lateinit var operationName: String lateinit var nodeTemplateName: String + var timeout: Int = 180 var operationInputs: MutableMap<String, JsonNode> = hashMapOf() override fun getName(): String { @@ -87,6 +89,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic this.operationInputs.putAll(operationResolvedProperties) + val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT) + timeout?.let { this.timeout = timeout } + return executionRequest } @@ -118,7 +123,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { try { prepareRequestNB(executionServiceInput) - processNB(executionServiceInput) + withTimeout((timeout * 1000).toLong()) { + processNB(executionServiceInput) + } } catch (runtimeException: RuntimeException) { log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException) recoverNB(runtimeException, executionServiceInput) diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index 2a14be216..6bee17f4b 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -44,10 +44,8 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, - executionServiceInput, deferredOutput) - return deferredOutput.await() + return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput) } } @@ -60,35 +58,41 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput lateinit var workflowName: String - lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput> override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: ExecutionServiceInput, - output: CompletableDeferred<ExecutionServiceOutput>) { + input: ExecutionServiceInput): ExecutionServiceOutput { this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input this.workflowName = this.executionServiceInput.actionIdentifiers.actionName - this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<ExecutionServiceOutput>() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor.send(startMessage) + } else { + throw BluePrintProcessorException("workflow($workflowActor) actor is closed") + } + return output.await() } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { - val wfStatus = if (exception != null) { - val status = Status() - status.message = BluePrintConstants.STATUS_FAILURE - status.errorMessage = exception.message - status - } else { - val status = Status() - status.message = BluePrintConstants.STATUS_SUCCESS - status + override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + val wfStatus = Status().apply { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + val errorMessage = it.message ?: "" + bluePrintRuntimeService.getBluePrintError().addError(errorMessage) + log.error("workflow($workflowId) exception :", it) + } + message = BluePrintConstants.STATUS_FAILURE + } else { + message = BluePrintConstants.STATUS_SUCCESS + } } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt index 89732e300..b64177aab 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt @@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.putJsonElement +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService import org.slf4j.LoggerFactory @@ -37,15 +37,22 @@ open class NodeTemplateExecutionService { executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { // Get the Blueprint Context val blueprintContext = bluePrintRuntimeService.bluePrintContext() + + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) // Get the Component Name, NodeTemplate type is mapped to Component Name - val componentName = blueprintContext.nodeTemplateByName(nodeTemplateName).type + val componentName = nodeTemplate.type val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + val nodeTemplateImplementation = blueprintContext + .nodeTemplateOperationImplementation(nodeTemplateName, interfaceName, operationName) + + val timeout: Int = nodeTemplateImplementation?.timeout ?: 180 + log.info("executing node template($nodeTemplateName) component($componentName) " + - "interface($interfaceName) operation($operationName)") + "interface($interfaceName) operation($operationName) with timeout($timeout) sec.") // Get the Component Instance val plugin = BluePrintDependencyService.instance<AbstractComponentFunction>(componentName) @@ -62,9 +69,10 @@ open class NodeTemplateExecutionService { // Populate Step Meta Data val stepInputs: MutableMap<String, JsonNode> = hashMapOf() - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE, nodeTemplateName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_INTERFACE, interfaceName) - stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_OPERATION, operationName) + stepInputs[BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE] = nodeTemplateName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_INTERFACE] = interfaceName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_OPERATION] = operationName.asJsonPrimitive() + stepInputs[BluePrintConstants.PROPERTY_CURRENT_TIMEOUT] = timeout.asJsonPrimitive() val stepInputData = StepData().apply { name = nodeTemplateName properties = stepInputs diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 064c196ed..ba5815bb6 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -180,6 +180,7 @@ object BluePrintConstants { const val PROPERTY_CURRENT_NODE_TEMPLATE = "current-node-template" const val PROPERTY_CURRENT_INTERFACE = "current-interface" const val PROPERTY_CURRENT_OPERATION = "current-operation" + const val PROPERTY_CURRENT_TIMEOUT = "current-timeout" const val PROPERTY_CURRENT_IMPLEMENTATION = "current-implementation" const val PROPERTY_EXECUTION_REQUEST = "execution-request" diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt index 93ba15e99..08bc6c3fd 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt @@ -242,6 +242,22 @@ fun Map<String, JsonNode>.getAsDouble(key: String): Double { return this[key]?.asDouble() ?: throw BluePrintException("couldn't find value for key($key)") } +fun Map<String, JsonNode>.getOptionalAsString(key: String): String? { + return if (this.containsKey(key)) this[key]!!.asText() else null +} + +fun Map<String, JsonNode>.getOptionalAsBoolean(key: String): Boolean? { + return if (this.containsKey(key)) this[key]!!.asBoolean() else null +} + +fun Map<String, JsonNode>.getOptionalAsInt(key: String): Int? { + return if (this.containsKey(key)) this[key]!!.asInt() else null +} + +fun Map<String, JsonNode>.getOptionalAsDouble(key: String): Double? { + return if (this.containsKey(key)) this[key]!!.asDouble() else null +} + // Checks inline fun checkEquals(value1: String?, value2: String?, lazyMessage: () -> Any): Boolean { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt index 9e1b7498e..fc796c9ed 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt @@ -33,7 +33,8 @@ enum class NodeStatus(val id: String) { READY("ready"), EXECUTING("executing"), EXECUTED("executed"), - SKIPPED("skipped") + SKIPPED("skipped"), + TERMINATED("terminated") } class Graph { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt index 066516fcc..b368c01aa 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt @@ -216,6 +216,11 @@ class BluePrintContext(val serviceTemplate: ServiceTemplate) { ?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s first InterfaceAssignment's first OperationAssignment name") } + fun nodeTemplateOperationImplementation(nodeTemplateName: String, interfaceName: String, operationName: String) + : Implementation? { + return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).implementation + } + fun nodeTemplateInterfaceOperationInputs(nodeTemplateName: String, interfaceName: String, operationName: String): MutableMap<String, JsonNode>? { return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).inputs } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 905150213..5cec3c947 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -30,13 +30,12 @@ import kotlin.coroutines.CoroutineContext interface BluePrintWorkFlowService<In, Out> { /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService] - * and workflow input [input], response will be retrieve from output [output]*/ - suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) + * and workflow input [input]*/ + suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out + suspend fun prepareWorkflowOutput(): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> @@ -91,6 +90,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP lateinit var workflowId: String + var exceptions: MutableList<Exception> = arrayListOf() + final override val coroutineContext: CoroutineContext get() = job + CoroutineName("Wf") @@ -100,7 +101,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP throw CancellationException("Workflow($workflowId) cancelled as requested") } - fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { @@ -119,13 +120,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.info("End Node Completed, processing completion message") - val bluePrintProcessorException: BluePrintProcessorException? = - if (exception != null) BluePrintProcessorException(exception) else null - - val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) + if (exception != null) exceptions.add(BluePrintProcessorException(exception)) + log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions") + val workflowOutput = prepareWorkflowOutput() workflowExecuteMessage.output.complete(workflowOutput) - channel.close(exception) + channel.close() } } } @@ -135,7 +134,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP when (message) { is WorkflowExecuteMessage<In, Out> -> { launch { - executeMessageActor(message) + try { + executeMessageActor(message) + } catch (e: Exception) { + exceptions.add(e) + } } } is WorkflowRestartMessage<In, Out> -> { @@ -153,7 +156,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } - private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { @@ -164,7 +167,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) { // Process only Next Success Node val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState)) - log.debug("Next Edges :$stateEdges") if (stateEdges.isNotEmpty()) { stateEdges.forEach { stateEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -213,7 +215,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } triggerToExecuteOrSkip(newMessage) } else { - log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)") + log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)") } } else { triggerToExecuteOrSkip(message) @@ -233,15 +235,19 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } // Update Node Completed node.status = NodeStatus.EXECUTED - log.info("Execute Node($node) -> Executed State($nodeState)") + log.info("Execute node(${node.id}) -> executed state($nodeState)") + // Check if the Node status edge is there, If not close processing + val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty() // If End Node, Send End Message if (graph.isEndNode(node)) { // Close the current channel channel.close() + } else if (!edgePresent) { + throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.") } else { val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState)) - log.debug("Skipping node($node) outgoing Edges($skippingEdges)") + log.debug("Skipping node($node)'s outgoing edges($skippingEdges)") // Process Skip Edges skippingEdges.forEach { skippingEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -266,7 +272,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) - log.info("Skip Node($node) -> Executed State($nodeState)") + log.info("Skip node(${node.id}) -> executed state($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED // Look for next possible skip nodes @@ -283,7 +289,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch { channel.close() - throw CancellationException("Workflow($workflowId) actor cancelled as requested ...") + throw CancellationException("Workflow($workflowId) actor cancelled as requested.") } /** Process each actor message received based on type **/ @@ -294,7 +300,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { readyNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } is NodeExecuteMessage<In, Out> -> { @@ -302,7 +309,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { executeNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -311,7 +320,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { skipNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -320,20 +331,12 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { restartNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } } } } } - - override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) { - log.info("Executing Graph : $graph") - this.graph = graph - this.workflowId = bluePrintRuntimeService.id() - val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) - } }
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index b8d8cea3e..4d97f8bc3 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -18,13 +18,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.toGraph import kotlin.test.assertNotNull @@ -36,10 +36,66 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testMultipleFlows() { + runBlocking { + coroutineScope { + val wfs = listOf("12345", "12346").map { + async { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(it), it) + assertNotNull(response, "failed to get response") + } + } + wfs.awaitAll() + } + } + } + + @Test + fun testMissingEdgeForBFailureState() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B")) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testTimeoutExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -51,10 +107,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -68,10 +122,8 @@ class BluePrintWorkflowServiceTest { val failurePathWorkflow = TestBluePrintWorkFlowService() failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), arrayListOf("A")) - val failurePathWorkflowDeferredOutput = CompletableDeferred<String>() val failurePathWorkflowInput = "123456" - failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) - val failurePathResponse = failurePathWorkflowDeferredOutput.await() + val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput) assertNotNull(failurePathResponse, "failed to get response") } } @@ -83,10 +135,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -98,17 +148,19 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + return mockBluePrintRuntimeService("123456") + } + + private fun mockBluePrintRuntimeService(id: String): BluePrintRuntimeService<*> { val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>() - every { bluePrintRuntimeService.id() } returns "123456" + every { bluePrintRuntimeService.id() } returns id return bluePrintRuntimeService } @@ -126,6 +178,7 @@ class BluePrintWorkflowServiceTest { class TestBluePrintWorkFlowService : AbstractBluePrintWorkFlowService<String, String>() { + val log = logger(TestBluePrintWorkFlowService::class) lateinit var simulatedState: MutableMap<String, EdgeLabel> @@ -133,6 +186,21 @@ class TestBluePrintWorkFlowService return EdgeLabel.SUCCESS } + override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: String): String { + log.info("Executing Graph : $graph") + this.graph = graph + this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<String>() + val startMessage = WorkflowExecuteMessage(input, output) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor().send(startMessage) + } else { + throw BluePrintProcessorException("workflow actor is closed for send $workflowActor") + } + return startMessage.output.await() + } + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<String, String> { return NodeExecuteMessage(node, "$node Input", "") @@ -140,23 +208,26 @@ class TestBluePrintWorkFlowService override suspend fun executeNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { -// val random = (1..10).random() * 1000 -// println("will reply in $random ms") +// val random = (1..10).random() * 100 +// log.info("workflow($workflowId) node(${node.id}) will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status +// //Simulation for timeout + if (node.id == "TO") { + withTimeout(1) { + kotlinx.coroutines.delay(2) + } + } + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { val nodeOutput = "" - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) - return nodeSkipMessage + return NodeSkipMessage(node, "$node Skip Input", nodeOutput) } override suspend fun skipNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, @@ -169,7 +240,12 @@ class TestBluePrintWorkFlowService TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { + override suspend fun prepareWorkflowOutput(): String { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + log.error("workflow($workflowId) exceptions :", it) + } + } return "Final Response" } }
\ No newline at end of file |