From ad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Thu, 15 Aug 2019 12:43:41 -0400 Subject: Modify workflow execution service options. Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c Issue-ID: CCSDK-1619 Signed-off-by: Brinda Santh --- .../core/dsl/BluePrintServiceDSLBuilder.kt | 2 +- .../core/service/BluePrintWorkflowService.kt | 92 +++++++++++----------- .../core/service/BluePrintWorkflowServiceTest.kt | 31 +++----- 3 files changed, 61 insertions(+), 64 deletions(-) (limited to 'ms/controllerblueprints') diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt index 06d3421c0..259efbf0b 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt @@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String, private val author: String, private val tags: String) { private var serviceTemplate = ServiceTemplate() - private lateinit var topologyTemplate: TopologyTemplate + private var topologyTemplate: TopologyTemplate? = null private var metadata: MutableMap = hashMapOf() private var dslDefinitions: MutableMap? = null private var imports: MutableList = mutableListOf() diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 91c2bcbbb..905150213 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -36,24 +36,20 @@ interface BluePrintWorkFlowService { suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(): Out + suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage - suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) + suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) + suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) + suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) + suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel } @@ -71,17 +67,13 @@ sealed class NodeMessage class NodeReadyMessage(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage() -class NodeExecuteMessage(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred) : NodeMessage() +class NodeExecuteMessage(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage() -class NodeRestartMessage(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred) : NodeMessage() +class NodeRestartMessage(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage() -class NodeSkipMessage(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred) : NodeMessage() +class NodeSkipMessage(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage() -class NodeCancelMessage(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred) : NodeMessage() +class NodeCancelMessage(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage() enum class EdgeAction(val id: String) { EXECUTE("execute"), @@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP fun cancel() { log.info("Received workflow($workflowId) cancel request") job.cancel() - throw CancellationException("Workflow($workflowId) cancelled as requested ...") + throw CancellationException("Workflow($workflowId) cancelled as requested") } - val workflowActor = actor>(coroutineContext, Channel.UNLIMITED) { - - /** Send message from workflow actor to node actor */ - fun sendNodeMessage(nodeMessage: NodeMessage) = launch { - nodeActor.send(nodeMessage) - } - + fun workflowActor() = actor>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage) { + + val nodeActor = nodeActor() // Prepare Workflow and Populate the Initial store initializeWorkflow(workflowExecuteMessage.input) @@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP // Prepare first node message and Send NodeExecuteMessage // Start node doesn't wait for any nodes, so we can pass Execute message directly val nodeExecuteMessage = prepareNodeExecutionMessage(startNode) - sendNodeMessage(nodeExecuteMessage) - log.debug("First node triggered successfully, waiting for response") - + /** Send message from workflow actor to node actor */ + launch { + nodeActor.send(nodeExecuteMessage) + } // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.debug("End Node Completed, processing completion message") - val workflowOutput = prepareWorkflowOutput() + log.info("End Node Completed, processing completion message") + val bluePrintProcessorException: BluePrintProcessorException? = + if (exception != null) BluePrintProcessorException(exception) else null + + val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) workflowExecuteMessage.output.complete(workflowOutput) channel.close(exception) } @@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP } - private val nodeActor = actor>(coroutineContext, Channel.UNLIMITED) { + private fun nodeActor() = actor>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage) = launch { @@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP } } - fun executeNodeWorker(message: NodeExecuteMessage) = launch { + suspend fun executeNodeWorker(message: NodeExecuteMessage) { val node = message.node node.status = NodeStatus.EXECUTING val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME @@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP } else { log.debug("##### Processing workflow($workflowId) node($node) #####") // Call the Extension function and get the next Edge state. - val deferredNodeState = CompletableDeferred() - executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - deferredNodeState.await() + executeNode(node, message.nodeInput, message.nodeOutput) } // Update Node Completed node.status = NodeStatus.EXECUTED @@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP } } - fun skipNodeWorker(message: NodeSkipMessage) = launch { + suspend fun skipNodeWorker(message: NodeSkipMessage) { val node = message.node val incomingEdges = graph.incomingEdges(node.id) // Check All Incoming Nodes Skipped @@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP if (nonSkippedEdges.isEmpty()) { log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function - val deferredNodeState = CompletableDeferred() - skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - val nodeState = deferredNodeState.await() + val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) log.info("Skip Node($node) -> Executed State($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED @@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP when (nodeMessage) { is NodeReadyMessage -> { // Blocking call - readyNodeWorker(nodeMessage) + try { + readyNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } is NodeExecuteMessage -> { launch { - executeNodeWorker(nodeMessage) + try { + executeNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeSkipMessage -> { launch { - skipNodeWorker(nodeMessage) + try { + skipNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeRestartMessage -> { launch { - restartNodeWorker(nodeMessage) + try { + restartNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } } @@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService : CoroutineScope, BlueP this.graph = graph this.workflowId = bluePrintRuntimeService.id() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor.send(startMessage) + workflowActor().send(startMessage) } } \ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index 62cb10851..b8d8cea3e 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.runBlocking import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph import org.onap.ccsdk.cds.controllerblueprints.core.toGraph @@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage { - val deferredNodeOutput = CompletableDeferred() - val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput) - return nodeExecuteMessage + return NodeExecuteMessage(node, "$node Input", "") } override suspend fun executeNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) { + nodeOutput: String): EdgeLabel { // val random = (1..10).random() * 1000 // println("will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) - deferredNodeOutput.complete("$node, Output: $nodeInput output") + return status } - override suspend fun prepareNodeSkipMessage(node: Graph.Node) - : NodeSkipMessage { - val deferredNodeOutput = CompletableDeferred() - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput) + override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage { + val nodeOutput = "" + val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) return nodeSkipMessage } override suspend fun skipNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) { + nodeOutput: String): EdgeLabel { val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) + return status } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override suspend fun restartNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred, - deferredNodeStatus: CompletableDeferred) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(): String { + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { return "Final Response" } } \ No newline at end of file -- cgit 1.2.3-korg