diff options
author | Dan Timoney <dtimoney@att.com> | 2019-08-22 22:02:40 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-08-22 22:02:40 +0000 |
commit | 9f260f36b66d0db54da89a8f9307d50fd0d4f320 (patch) | |
tree | 7d0ef6cd07ffc8612b3b9db907d649b8bd58f443 /ms/controllerblueprints/modules/blueprint-core | |
parent | b496e93839fba89a8ea2ab026954a6f6359bc4c2 (diff) | |
parent | 43957fac4ead89a43ef97bf749b1bddc1adf0d6c (diff) |
Merge "Component node timeout implementation"
Diffstat (limited to 'ms/controllerblueprints/modules/blueprint-core')
6 files changed, 162 insertions, 60 deletions
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 064c196ed..ba5815bb6 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -180,6 +180,7 @@ object BluePrintConstants { const val PROPERTY_CURRENT_NODE_TEMPLATE = "current-node-template" const val PROPERTY_CURRENT_INTERFACE = "current-interface" const val PROPERTY_CURRENT_OPERATION = "current-operation" + const val PROPERTY_CURRENT_TIMEOUT = "current-timeout" const val PROPERTY_CURRENT_IMPLEMENTATION = "current-implementation" const val PROPERTY_EXECUTION_REQUEST = "execution-request" diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt index 93ba15e99..08bc6c3fd 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt @@ -242,6 +242,22 @@ fun Map<String, JsonNode>.getAsDouble(key: String): Double { return this[key]?.asDouble() ?: throw BluePrintException("couldn't find value for key($key)") } +fun Map<String, JsonNode>.getOptionalAsString(key: String): String? { + return if (this.containsKey(key)) this[key]!!.asText() else null +} + +fun Map<String, JsonNode>.getOptionalAsBoolean(key: String): Boolean? { + return if (this.containsKey(key)) this[key]!!.asBoolean() else null +} + +fun Map<String, JsonNode>.getOptionalAsInt(key: String): Int? { + return if (this.containsKey(key)) this[key]!!.asInt() else null +} + +fun Map<String, JsonNode>.getOptionalAsDouble(key: String): Double? { + return if (this.containsKey(key)) this[key]!!.asDouble() else null +} + // Checks inline fun checkEquals(value1: String?, value2: String?, lazyMessage: () -> Any): Boolean { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt index 9e1b7498e..fc796c9ed 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt @@ -33,7 +33,8 @@ enum class NodeStatus(val id: String) { READY("ready"), EXECUTING("executing"), EXECUTED("executed"), - SKIPPED("skipped") + SKIPPED("skipped"), + TERMINATED("terminated") } class Graph { diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt index 066516fcc..b368c01aa 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt @@ -216,6 +216,11 @@ class BluePrintContext(val serviceTemplate: ServiceTemplate) { ?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s first InterfaceAssignment's first OperationAssignment name") } + fun nodeTemplateOperationImplementation(nodeTemplateName: String, interfaceName: String, operationName: String) + : Implementation? { + return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).implementation + } + fun nodeTemplateInterfaceOperationInputs(nodeTemplateName: String, interfaceName: String, operationName: String): MutableMap<String, JsonNode>? { return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).inputs } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 905150213..5cec3c947 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -30,13 +30,12 @@ import kotlin.coroutines.CoroutineContext interface BluePrintWorkFlowService<In, Out> { /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService] - * and workflow input [input], response will be retrieve from output [output]*/ - suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) + * and workflow input [input]*/ + suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out + suspend fun prepareWorkflowOutput(): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> @@ -91,6 +90,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP lateinit var workflowId: String + var exceptions: MutableList<Exception> = arrayListOf() + final override val coroutineContext: CoroutineContext get() = job + CoroutineName("Wf") @@ -100,7 +101,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP throw CancellationException("Workflow($workflowId) cancelled as requested") } - fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { @@ -119,13 +120,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.info("End Node Completed, processing completion message") - val bluePrintProcessorException: BluePrintProcessorException? = - if (exception != null) BluePrintProcessorException(exception) else null - - val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) + if (exception != null) exceptions.add(BluePrintProcessorException(exception)) + log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions") + val workflowOutput = prepareWorkflowOutput() workflowExecuteMessage.output.complete(workflowOutput) - channel.close(exception) + channel.close() } } } @@ -135,7 +134,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP when (message) { is WorkflowExecuteMessage<In, Out> -> { launch { - executeMessageActor(message) + try { + executeMessageActor(message) + } catch (e: Exception) { + exceptions.add(e) + } } } is WorkflowRestartMessage<In, Out> -> { @@ -153,7 +156,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } - private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { @@ -164,7 +167,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) { // Process only Next Success Node val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState)) - log.debug("Next Edges :$stateEdges") if (stateEdges.isNotEmpty()) { stateEdges.forEach { stateEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -213,7 +215,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } triggerToExecuteOrSkip(newMessage) } else { - log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)") + log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)") } } else { triggerToExecuteOrSkip(message) @@ -233,15 +235,19 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } // Update Node Completed node.status = NodeStatus.EXECUTED - log.info("Execute Node($node) -> Executed State($nodeState)") + log.info("Execute node(${node.id}) -> executed state($nodeState)") + // Check if the Node status edge is there, If not close processing + val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty() // If End Node, Send End Message if (graph.isEndNode(node)) { // Close the current channel channel.close() + } else if (!edgePresent) { + throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.") } else { val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState)) - log.debug("Skipping node($node) outgoing Edges($skippingEdges)") + log.debug("Skipping node($node)'s outgoing edges($skippingEdges)") // Process Skip Edges skippingEdges.forEach { skippingEdge -> // Prepare next node ready message and Send NodeReadyMessage @@ -266,7 +272,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) - log.info("Skip Node($node) -> Executed State($nodeState)") + log.info("Skip node(${node.id}) -> executed state($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED // Look for next possible skip nodes @@ -283,7 +289,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch { channel.close() - throw CancellationException("Workflow($workflowId) actor cancelled as requested ...") + throw CancellationException("Workflow($workflowId) actor cancelled as requested.") } /** Process each actor message received based on type **/ @@ -294,7 +300,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { readyNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } is NodeExecuteMessage<In, Out> -> { @@ -302,7 +309,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { executeNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -311,7 +320,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { skipNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + nodeMessage.node.status = NodeStatus.TERMINATED + exceptions.add(e) + channel.close() } } } @@ -320,20 +331,12 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP try { restartNodeWorker(nodeMessage) } catch (e: Exception) { - channel.close(e) + exceptions.add(e) + channel.close() } } } } } } - - override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, - input: In, output: CompletableDeferred<Out>) { - log.info("Executing Graph : $graph") - this.graph = graph - this.workflowId = bluePrintRuntimeService.id() - val startMessage = WorkflowExecuteMessage(input, output) - workflowActor().send(startMessage) - } }
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index b8d8cea3e..4d97f8bc3 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -18,13 +18,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.toGraph import kotlin.test.assertNotNull @@ -36,10 +36,66 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testMultipleFlows() { + runBlocking { + coroutineScope { + val wfs = listOf("12345", "12346").map { + async { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(it), it) + assertNotNull(response, "failed to get response") + } + } + wfs.awaitAll() + } + } + } + + @Test + fun testMissingEdgeForBFailureState() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B")) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testTimeoutExceptionFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null) + val input = "123456" + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -51,10 +107,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -68,10 +122,8 @@ class BluePrintWorkflowServiceTest { val failurePathWorkflow = TestBluePrintWorkFlowService() failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), arrayListOf("A")) - val failurePathWorkflowDeferredOutput = CompletableDeferred<String>() val failurePathWorkflowInput = "123456" - failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) - val failurePathResponse = failurePathWorkflowDeferredOutput.await() + val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput) assertNotNull(failurePathResponse, "failed to get response") } } @@ -83,10 +135,8 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } @@ -98,17 +148,19 @@ class BluePrintWorkflowServiceTest { .toGraph() val simpleWorkflow = TestBluePrintWorkFlowService() simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) - val deferredOutput = CompletableDeferred<String>() val input = "123456" - simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) - val response = deferredOutput.await() + val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input) assertNotNull(response, "failed to get response") } } private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + return mockBluePrintRuntimeService("123456") + } + + private fun mockBluePrintRuntimeService(id: String): BluePrintRuntimeService<*> { val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>() - every { bluePrintRuntimeService.id() } returns "123456" + every { bluePrintRuntimeService.id() } returns id return bluePrintRuntimeService } @@ -126,6 +178,7 @@ class BluePrintWorkflowServiceTest { class TestBluePrintWorkFlowService : AbstractBluePrintWorkFlowService<String, String>() { + val log = logger(TestBluePrintWorkFlowService::class) lateinit var simulatedState: MutableMap<String, EdgeLabel> @@ -133,6 +186,21 @@ class TestBluePrintWorkFlowService return EdgeLabel.SUCCESS } + override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: String): String { + log.info("Executing Graph : $graph") + this.graph = graph + this.workflowId = bluePrintRuntimeService.id() + val output = CompletableDeferred<String>() + val startMessage = WorkflowExecuteMessage(input, output) + val workflowActor = workflowActor() + if (!workflowActor.isClosedForSend) { + workflowActor().send(startMessage) + } else { + throw BluePrintProcessorException("workflow actor is closed for send $workflowActor") + } + return startMessage.output.await() + } + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<String, String> { return NodeExecuteMessage(node, "$node Input", "") @@ -140,23 +208,26 @@ class TestBluePrintWorkFlowService override suspend fun executeNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { -// val random = (1..10).random() * 1000 -// println("will reply in $random ms") +// val random = (1..10).random() * 100 +// log.info("workflow($workflowId) node(${node.id}) will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status +// //Simulation for timeout + if (node.id == "TO") { + withTimeout(1) { + kotlinx.coroutines.delay(2) + } + } + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { val nodeOutput = "" - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) - return nodeSkipMessage + return NodeSkipMessage(node, "$node Skip Input", nodeOutput) } override suspend fun skipNode(node: Graph.Node, nodeInput: String, nodeOutput: String): EdgeLabel { - val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - return status + return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, @@ -169,7 +240,12 @@ class TestBluePrintWorkFlowService TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { + override suspend fun prepareWorkflowOutput(): String { + if (exceptions.isNotEmpty()) { + exceptions.forEach { + log.error("workflow($workflowId) exceptions :", it) + } + } return "Final Response" } }
\ No newline at end of file |