aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt')
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt353
1 files changed, 353 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
new file mode 100644
index 000000000..f3e4e59aa
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
@@ -0,0 +1,353 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.service
+
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.CoroutineName
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.actor
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
+import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
+import org.onap.ccsdk.cds.controllerblueprints.core.incomingEdges
+import org.onap.ccsdk.cds.controllerblueprints.core.isEndNode
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdges
+import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdgesNotInLabels
+import org.onap.ccsdk.cds.controllerblueprints.core.startNodes
+import kotlin.coroutines.CoroutineContext
+
+interface BluePrintWorkFlowService<In, Out> {
+
+ /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
+ * and workflow input [input]*/
+ suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
+
+ suspend fun initializeWorkflow(input: In): EdgeLabel
+
+ suspend fun prepareWorkflowOutput(): Out
+
+ /** Prepare the message for the Node */
+ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
+
+ suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
+
+ suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
+
+ suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
+
+ suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
+
+ suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
+}
+
+/** Workflow Message Types */
+sealed class WorkflowMessage<In, Out>
+
+class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+/** Node Message Types */
+sealed class NodeMessage<In, Out>
+
+class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
+
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
+
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
+
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
+
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
+
+enum class EdgeAction(val id: String) {
+ EXECUTE("execute"),
+ SKIP("skip")
+}
+
+/** Abstract workflow service implementation */
+abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
+
+ lateinit var graph: Graph
+
+ private val log = logger(AbstractBluePrintWorkFlowService::class)
+
+ private val job = Job()
+
+ lateinit var workflowId: String
+
+ var exceptions: MutableList<Exception> = arrayListOf()
+
+ override val coroutineContext: CoroutineContext
+ get() = job + CoroutineName("Wf")
+
+ fun cancel() {
+ log.info("Received workflow($workflowId) cancel request")
+ job.cancel()
+ throw CancellationException("Workflow($workflowId) cancelled as requested")
+ }
+
+ suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+ /** Process the workflow execution message */
+ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+
+ val nodeActor = nodeActor()
+ // Prepare Workflow and Populate the Initial store
+ initializeWorkflow(workflowExecuteMessage.input)
+
+ val startNode = graph.startNodes().first()
+ // Prepare first node message and Send NodeExecuteMessage
+ // Start node doesn't wait for any nodes, so we can pass Execute message directly
+ val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
+ /** Send message from workflow actor to node actor */
+ launch {
+ nodeActor.send(nodeExecuteMessage)
+ }
+ // Wait for workflow completion or Error
+ nodeActor.invokeOnClose { exception ->
+ launch {
+ if (exception != null) exceptions.add(BluePrintProcessorException(exception))
+ log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
+ val workflowOutput = prepareWorkflowOutput()
+ workflowExecuteMessage.output.complete(workflowOutput)
+ channel.close()
+ }
+ }
+ }
+
+ /** Process each actor message received based on type */
+ consumeEach { message ->
+ when (message) {
+ is WorkflowExecuteMessage<In, Out> -> {
+ launch {
+ try {
+ executeMessageActor(message)
+ } catch (e: Exception) {
+ exceptions.add(e)
+ }
+ }
+ }
+ is WorkflowRestartMessage<In, Out> -> {
+ launch {
+ TODO("")
+ }
+ }
+ is WorkflowCancelMessage<In, Out> -> {
+ launch {
+ TODO("")
+ }
+ }
+ }
+ }
+ }
+
+ private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+
+ /** Send message to process from one state to other state */
+ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
+ channel.send(nodeMessage)
+ }
+
+ /** Process the cascade node processing, based on the previous state of the node */
+ fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
+ // Process only Next Success Node
+ val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
+ if (stateEdges.isNotEmpty()) {
+ stateEdges.forEach { stateEdge ->
+ // Prepare next node ready message and Send NodeReadyMessage
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ }
+ }
+
+ suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
+ val edge = message.fromEdge
+ val node = edge.target
+ // Check if current edge action is Skip or Execute
+ when (message.edgeAction) {
+ EdgeAction.SKIP -> {
+ val skipMessage = prepareNodeSkipMessage(node)
+ sendNodeMessage(skipMessage)
+ }
+ EdgeAction.EXECUTE -> {
+ val nodeExecuteMessage = prepareNodeExecutionMessage(node)
+ sendNodeMessage(nodeExecuteMessage)
+ }
+ }
+ }
+
+ suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
+ val edge = message.fromEdge
+ val node = edge.target
+ log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
+ // Update the current incoming edge status to executed or skipped
+ when (message.edgeAction) {
+ EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
+ EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
+ }
+ val incomingEdges = graph.incomingEdges(node.id)
+ if (incomingEdges.size > 1) {
+ // Check all incoming edges executed or skipped
+ val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
+ if (notCompletedEdges.isEmpty()) {
+ // Possibility of skip edge action performed at last, but other edges have execute action.
+ val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
+ val newMessage = if (executePresent.isNotEmpty()) {
+ NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
+ } else {
+ message
+ }
+ triggerToExecuteOrSkip(newMessage)
+ } else {
+ log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
+ }
+ } else {
+ triggerToExecuteOrSkip(message)
+ }
+ }
+
+ suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
+ val node = message.node
+ node.status = NodeStatus.EXECUTING
+ val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME ||
+ node.id == BluePrintConstants.GRAPH_END_NODE_NAME
+ ) {
+ EdgeLabel.SUCCESS
+ } else {
+ log.debug("##### Processing workflow($workflowId) node($node) #####")
+ // Call the Extension function and get the next Edge state.
+ executeNode(node, message.nodeInput, message.nodeOutput)
+ }
+ // Update Node Completed
+ node.status = NodeStatus.EXECUTED
+ log.info("Execute node(${node.id}) -> executed state($nodeState)")
+ // Check if the Node status edge is there, If not close processing
+ val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
+
+ // If End Node, Send End Message
+ if (graph.isEndNode(node)) {
+ // Close the current channel
+ channel.close()
+ } else if (!edgePresent) {
+ throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
+ } else {
+ val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
+ log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
+ // Process Skip Edges
+ skippingEdges.forEach { skippingEdge ->
+ // Prepare next node ready message and Send NodeReadyMessage
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ // Process Success Node
+ processNextNodes(node, nodeState)
+ }
+ }
+
+ suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
+ val node = message.node
+ val incomingEdges = graph.incomingEdges(node.id)
+ // Check All Incoming Nodes Skipped
+ val nonSkippedEdges = incomingEdges.filter {
+ it.status == EdgeStatus.NOT_STARTED
+ }
+ log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
+
+ if (nonSkippedEdges.isEmpty()) {
+ log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
+ // Call the Extension Function
+ val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
+ log.info("Skip node(${node.id}) -> executed state($nodeState)")
+ // Mark the Current node as Skipped
+ node.status = NodeStatus.SKIPPED
+ // Look for next possible skip nodes
+ graph.outgoingEdges(node.id).forEach { outgoingEdge ->
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ }
+ }
+
+ fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
+ TODO()
+ }
+
+ fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
+ channel.close()
+ throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
+ }
+
+ /** Process each actor message received based on type **/
+ consumeEach { nodeMessage ->
+ when (nodeMessage) {
+ is NodeReadyMessage<In, Out> -> {
+ // Blocking call
+ try {
+ readyNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ exceptions.add(e)
+ channel.close()
+ }
+ }
+ is NodeExecuteMessage<In, Out> -> {
+ launch {
+ try {
+ executeNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ nodeMessage.node.status = NodeStatus.TERMINATED
+ exceptions.add(e)
+ channel.close()
+ }
+ }
+ }
+ is NodeSkipMessage<In, Out> -> {
+ launch {
+ try {
+ skipNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ nodeMessage.node.status = NodeStatus.TERMINATED
+ exceptions.add(e)
+ channel.close()
+ }
+ }
+ }
+ is NodeRestartMessage<In, Out> -> {
+ launch {
+ try {
+ restartNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ exceptions.add(e)
+ channel.close()
+ }
+ }
+ }
+ }
+ }
+ }
+}