From d58654ce65f36b9d6ddc3f38c751da26d029ea41 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 13 Aug 2019 20:51:40 -0400 Subject: Add imperative workflow service. Change-Id: Ic74bb7796244c466a0d8561eed27174fc1a14ebb Issue-ID: CCSDK-1617 Signed-off-by: Brinda Santh --- .../core/service/BluePrintWorkflowService.kt | 340 +++++++++++++++++++++ .../core/service/BluePrintWorkflowServiceTest.kt | 182 +++++++++++ 2 files changed, 522 insertions(+) create mode 100644 ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt create mode 100644 ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt new file mode 100644 index 000000000..019f31805 --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -0,0 +1,340 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.service + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.actor +import kotlinx.coroutines.channels.consumeEach +import org.onap.ccsdk.cds.controllerblueprints.core.* +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus +import kotlin.coroutines.CoroutineContext + +interface BluePrintWorkFlowService { + + /** Executes imperative workflow for the bluePrintRuntimeService [bluePrintRuntimeService] and workflow + * input [input], response will be retrieve from output [output]*/ + suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, + input: In, output: CompletableDeferred) + + suspend fun initializeWorkflow(input: In): EdgeLabel + + suspend fun prepareWorkflowOutput(): Out + + /** Prepare the message for the Node */ + suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage + + suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage + + suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) + + suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) + + suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) + + suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) + +} + +/** Workflow Message Types */ +sealed class WorkflowMessage + +class WorkflowExecuteMessage(val input: In, val output: CompletableDeferred) : WorkflowMessage() + +class WorkflowCancelMessage(val input: In, val output: CompletableDeferred) : WorkflowMessage() + +class WorkflowRestartMessage(val input: In, val output: CompletableDeferred) : WorkflowMessage() + +/** Node Message Types */ +sealed class NodeMessage + +class NodeReadyMessage(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage() + +class NodeExecuteMessage(val node: Graph.Node, val nodeInput: In, + val nodeOutput: CompletableDeferred) : NodeMessage() + +class NodeRestartMessage(val node: Graph.Node, val nodeInput: In, + val nodeOutput: CompletableDeferred) : NodeMessage() + +class NodeSkipMessage(val node: Graph.Node, val nodeInput: In, + val nodeOutput: CompletableDeferred) : NodeMessage() + +class NodeCancelMessage(val node: Graph.Node, val nodeInput: In, + val nodeOutput: CompletableDeferred) : NodeMessage() + +enum class EdgeAction(val id: String) { + EXECUTE("execute"), + SKIP("skip") +} + +/** Abstract workflow service implementation */ +abstract class AbstractBluePrintWorkFlowService(private val graph: Graph) + : CoroutineScope, BluePrintWorkFlowService { + + private val log = logger(AbstractBluePrintWorkFlowService::class) + + private val job = Job() + + lateinit var workflowId: String + + final override val coroutineContext: CoroutineContext + get() = job + CoroutineName("Wf") + + val root = graph.startNodes() + + fun cancel() { + log.info("Received workflow($workflowId) cancel request") + job.cancel() + throw CancellationException("Workflow($workflowId) cancelled as requested ...") + } + + val workflowActor = actor>(coroutineContext, Channel.UNLIMITED) { + + /** Send message from workflow actor to node actor */ + fun sendNodeMessage(nodeMessage: NodeMessage) = launch { + nodeActor.send(nodeMessage) + } + + /** Process the workflow execution message */ + suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage) { + // Prepare Workflow and Populate the Initial store + initializeWorkflow(workflowExecuteMessage.input) + + val startNode = root.first() + // Prepare first node message and Send NodeExecuteMessage + // Start node doesn't wait for any nodes, so we can pass Execute message directly + val nodeExecuteMessage = prepareNodeExecutionMessage(startNode) + sendNodeMessage(nodeExecuteMessage) + log.debug("First node triggered successfully, waiting for response") + + // Wait for workflow completion or Error + nodeActor.invokeOnClose { exception -> + launch { + log.debug("End Node Completed, processing completion message") + val workflowOutput = prepareWorkflowOutput() + workflowExecuteMessage.output.complete(workflowOutput) + channel.close(exception) + } + } + } + + /** Process each actor message received based on type */ + consumeEach { message -> + when (message) { + is WorkflowExecuteMessage -> { + launch { + executeMessageActor(message) + } + } + is WorkflowRestartMessage -> { + launch { + TODO("") + } + } + is WorkflowCancelMessage -> { + launch { + TODO("") + } + } + } + } + } + + + private val nodeActor = actor>(coroutineContext, Channel.UNLIMITED) { + + /** Send message to process from one state to other state */ + fun sendNodeMessage(nodeMessage: NodeMessage) = launch { + channel.send(nodeMessage) + } + + /** Process the cascade node processing, based on the previous state of the node */ + fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) { + // Process only Next Success Node + val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState)) + log.debug("Next Edges :$stateEdges") + if (stateEdges.isNotEmpty()) { + stateEdges.forEach { stateEdge -> + // Prepare next node ready message and Send NodeReadyMessage + val nodeReadyMessage = NodeReadyMessage(stateEdge, EdgeAction.EXECUTE) + sendNodeMessage(nodeReadyMessage) + } + } + } + + suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage) { + val edge = message.fromEdge + val node = edge.target + // Check if current edge action is Skip or Execute + when (message.edgeAction) { + EdgeAction.SKIP -> { + val skipMessage = prepareNodeSkipMessage(node) + sendNodeMessage(skipMessage) + } + EdgeAction.EXECUTE -> { + val nodeExecuteMessage = prepareNodeExecutionMessage(node) + sendNodeMessage(nodeExecuteMessage) + } + } + } + + suspend fun readyNodeWorker(message: NodeReadyMessage) { + val edge = message.fromEdge + val node = edge.target + log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@") + // Update the current incoming edge status to executed or skipped + when (message.edgeAction) { + EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED + EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED + } + val incomingEdges = graph.incomingEdges(node.id) + if (incomingEdges.size > 1) { + // Check all incoming edges executed or skipped + val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED } + if (notCompletedEdges.isEmpty()) { + // Possibility of skip edge action performed at last, but other edges have execute action. + val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED } + val newMessage = if (executePresent.isNotEmpty()) { + NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE) + } else { + message + } + triggerToExecuteOrSkip(newMessage) + } else { + log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)") + } + } else { + triggerToExecuteOrSkip(message) + } + } + + fun executeNodeWorker(message: NodeExecuteMessage) = launch { + val node = message.node + node.status = NodeStatus.EXECUTING + val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME + || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) { + EdgeLabel.SUCCESS + } else { + log.debug("##### Processing workflow($workflowId) node($node) #####") + // Call the Extension function and get the next Edge state. + val deferredNodeState = CompletableDeferred() + executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) + deferredNodeState.await() + } + // Update Node Completed + node.status = NodeStatus.EXECUTED + log.info("Execute Node($node) -> Executed State($nodeState)") + + // If End Node, Send End Message + if (graph.isEndNode(node)) { + // Close the current channel + channel.close() + } else { + val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState)) + log.debug("Skipping node($node) outgoing Edges($skippingEdges)") + // Process Skip Edges + skippingEdges.forEach { skippingEdge -> + // Prepare next node ready message and Send NodeReadyMessage + val nodeReadyMessage = NodeReadyMessage(skippingEdge, EdgeAction.SKIP) + sendNodeMessage(nodeReadyMessage) + } + // Process Success Node + processNextNodes(node, nodeState) + } + } + + fun skipNodeWorker(message: NodeSkipMessage) = launch { + val node = message.node + val incomingEdges = graph.incomingEdges(node.id) + // Check All Incoming Nodes Skipped + val nonSkippedEdges = incomingEdges.filter { + it.status == EdgeStatus.NOT_STARTED + } + log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)") + + if (nonSkippedEdges.isEmpty()) { + log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") + // Call the Extension Function + val deferredNodeState = CompletableDeferred() + skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) + val nodeState = deferredNodeState.await() + log.info("Skip Node($node) -> Executed State($nodeState)") + // Mark the Current node as Skipped + node.status = NodeStatus.SKIPPED + // Look for next possible skip nodes + graph.outgoingEdges(node.id).forEach { outgoingEdge -> + val nodeReadyMessage = NodeReadyMessage(outgoingEdge, EdgeAction.SKIP) + sendNodeMessage(nodeReadyMessage) + } + } + } + + fun restartNodeWorker(message: NodeRestartMessage) = launch { + TODO() + } + + fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage) = launch { + channel.close() + throw CancellationException("Workflow($workflowId) actor cancelled as requested ...") + } + + /** Process each actor message received based on type **/ + consumeEach { nodeMessage -> + when (nodeMessage) { + is NodeReadyMessage -> { + // Blocking call + readyNodeWorker(nodeMessage) + } + is NodeExecuteMessage -> { + launch { + executeNodeWorker(nodeMessage) + } + } + is NodeSkipMessage -> { + launch { + skipNodeWorker(nodeMessage) + } + } + is NodeRestartMessage -> { + launch { + restartNodeWorker(nodeMessage) + } + } + } + } + } + + + override suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In, output: CompletableDeferred) { + log.info("Executing Graph : $graph") + this.workflowId = bluePrintRuntimeService.id() + validateWorkflow() + val startMessage = WorkflowExecuteMessage(input, output) + workflowActor.send(startMessage) + } + + open fun validateWorkflow() { + //check(!graph.findCycles().isNotEmpty()) { "Graph is cyclic, Cycle is not supported" } + } +} \ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt new file mode 100644 index 000000000..7cb64922c --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -0,0 +1,182 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.service + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.runBlocking +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.toGraph +import kotlin.test.assertNotNull + +class BluePrintWorkflowServiceTest { + @Test + fun testSimpleFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>E/SUCCESS, E>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService(graph) + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred() + val input = "123456" + simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testConditionalFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService(graph) + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred() + val input = "123456" + simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBothConditionalFlow() { + runBlocking { + // Failure Flow + val failurePatGraph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val failurePathWorkflow = TestBluePrintWorkFlowService(failurePatGraph) + failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), + arrayListOf("A")) + val failurePathWorkflowDeferredOutput = CompletableDeferred() + val failurePathWorkflowInput = "123456" + failurePathWorkflow.executeWorkflow(mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) + val failurePathResponse = failurePathWorkflowDeferredOutput.await() + assertNotNull(failurePathResponse, "failed to get response") + } + } + + @Test + fun testMultipleSkipFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, C>D/SUCCESS, D>E/SUCCESS, B>E/SUCCESS, E>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService(graph) + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred() + val input = "123456" + simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testParallelFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/SUCCESS, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService(graph) + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val deferredOutput = CompletableDeferred() + val input = "123456" + simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + val bluePrintRuntimeService = mockk>() + every { bluePrintRuntimeService.id() } returns "123456" + return bluePrintRuntimeService + } + + private fun prepareSimulation(successes: List?, failures: List?): MutableMap { + val simulatedState: MutableMap = hashMapOf() + successes?.forEach { + simulatedState[it] = EdgeLabel.SUCCESS + } + failures?.forEach { + simulatedState[it] = EdgeLabel.FAILURE + } + return simulatedState + } +} + +class TestBluePrintWorkFlowService(graph: Graph) + : AbstractBluePrintWorkFlowService(graph) { + + lateinit var simulatedState: MutableMap + + override suspend fun initializeWorkflow(input: String): EdgeLabel { + return EdgeLabel.SUCCESS + } + + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) + : NodeExecuteMessage { + val deferredNodeOutput = CompletableDeferred() + val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput) + return nodeExecuteMessage + } + + override suspend fun executeNode(node: Graph.Node, nodeInput: String, + deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) { +// val random = (1..10).random() * 1000 +// println("will reply in $random ms") +// kotlinx.coroutines.delay(random.toLong()) + val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") + deferredNodeStatus.complete(status) + deferredNodeOutput.complete("$node, Output: $nodeInput output") + } + + override suspend fun prepareNodeSkipMessage(node: Graph.Node) + : NodeSkipMessage { + val deferredNodeOutput = CompletableDeferred() + val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput) + return nodeSkipMessage + } + + override suspend fun skipNode(node: Graph.Node, nodeInput: String, + deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) { + val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") + deferredNodeStatus.complete(status) + } + + override suspend fun cancelNode(node: Graph.Node, nodeInput: String, + deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun restartNode(node: Graph.Node, nodeInput: String, + deferredNodeOutput: CompletableDeferred, + deferredNodeStatus: CompletableDeferred) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun prepareWorkflowOutput(): String { + return "Final Response" + } +} \ No newline at end of file -- cgit 1.2.3-korg