From 43957fac4ead89a43ef97bf749b1bddc1adf0d6c Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Fri, 16 Aug 2019 10:14:25 -0400 Subject: Component node timeout implementation Change-Id: I99fc9efba76595693d95772e409f0f982aeae9b1 Issue-ID: CCSDK-1619 Signed-off-by: Brinda Santh --- .../core/service/BluePrintWorkflowServiceTest.kt | 130 ++++++++++++++++----- 1 file changed, 103 insertions(+), 27 deletions(-) (limited to 'ms/controllerblueprints/modules/blueprint-core/src/test/kotlin') diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index b8d8cea3e..4d97f8bc3 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -18,13 +18,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.toGraph import kotlin.test.assertNotNull @@ -36,10 +36,66 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testMultipleFlows() { + runBlocking { + coroutineScope { + val wfs = listOf("12345", "12346").map { + async { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(it), it) + assertNotNull(response, "failed to get response") + } + } + wfs.awaitAll() + } + } + } + + @Test + fun testMissingEdgeForBFailureState() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B")) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testTimeoutExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -51,10 +107,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -68,10 +122,8 @@ class BluePrintWorkflowServiceTest { val failurePathWorkflow = TestBluePrintWorkFlowService() failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), arrayListOf("A")) - val failurePathWorkflowDeferredOutput = CompletableDeferred() val failurePathWorkflowInput = "123456" - failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) - val failurePathResponse = failurePathWorkflowDeferredOutput.await() + val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput) assertNotNull(failurePathResponse, "failed to get response") } } @@ -83,10 +135,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -98,17 +148,19 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) - val deferredOutput = CompletableDeferred() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + return mockBluePrintRuntimeService("123456") + } + + private fun mockBluePrintRuntimeService(id: String): BluePrintRuntimeService<*> { val bluePrintRuntimeService = mockk>() - every { bluePrintRuntimeService.id() } returns "123456" + every { bluePrintRuntimeService.id() } returns id return bluePrintRuntimeService } @@ -126,6 +178,7 @@ class BluePrintWorkflowServiceTest { class TestBluePrintWorkFlowService : AbstractBluePrintWorkFlowService() { + val log = logger(TestBluePrintWorkFlowService::class) lateinit var simulatedState: MutableMap @@ -133,6 +186,21 @@ class TestBluePrintWorkFlowService return EdgeLabel.SUCCESS } + override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: String): String { + log.info("Executing Graph : $graph") + this.graph = graph + this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred() + val startMessage = WorkflowExecuteMessage(input, output) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor().send(startMessage) + } else { + throw BluePrintProcessorException("workflow actor is closed for send $workflowActor") + } + return startMessage.output.await() + } + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage { return NodeExecuteMessage(node, "$node Input", "") @@ -140,23 +208,26 @@ class TestBluePrintWorkFlowService override suspend fun executeNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { -// val random = (1..10).random() * 1000 -// println("will reply in $random ms") +// val random = (1..10).random() * 100 +// log.info("workflow($workflowId) node(${node.id}) will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status +// //Simulation for timeout + if (node.id == "TO") { + withTimeout(1) { + kotlinx.coroutines.delay(2) + } + } + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage { val nodeOutput = "" - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) - return nodeSkipMessage + return NodeSkipMessage(node, "$node Skip Input", nodeOutput) } override suspend fun skipNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, @@ -169,7 +240,12 @@ class TestBluePrintWorkFlowService TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { + override suspend fun prepareWorkflowOutput(): String { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + log.error("workflow($workflowId) exceptions :", it) + } + } return "Final Response" } } \ No newline at end of file -- cgit 1.2.3-korg