diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-08-15 12:43:41 -0400 |
---|---|---|
committer | Brinda Santh Muthuramalingam <brindasanth@in.ibm.com> | 2019-08-16 14:28:55 +0000 |
commit | ad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 (patch) | |
tree | 52d4dc8ef4cafb8f3a81a9901b5e7cc6d2bcd903 /ms/controllerblueprints | |
parent | 048aad79ece5b709e65155e2d0c8675b7c2c84a2 (diff) |
Modify workflow execution service options.
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c
Issue-ID: CCSDK-1619
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/controllerblueprints')
3 files changed, 61 insertions, 64 deletions
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt index 06d3421c0..259efbf0b 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt @@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String, private val author: String, private val tags: String) { private var serviceTemplate = ServiceTemplate() - private lateinit var topologyTemplate: TopologyTemplate + private var topologyTemplate: TopologyTemplate? = null private var metadata: MutableMap<String, String> = hashMapOf() private var dslDefinitions: MutableMap<String, JsonNode>? = null private var imports: MutableList<ImportDefinition> = mutableListOf() diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 91c2bcbbb..905150213 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -36,24 +36,20 @@ interface BluePrintWorkFlowService<In, Out> { suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(): Out + suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out> - suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel } @@ -71,17 +67,13 @@ sealed class NodeMessage<In, Out> class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>() -class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() enum class EdgeAction(val id: String) { EXECUTE("execute"), @@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun cancel() { log.info("Received workflow($workflowId) cancel request") job.cancel() - throw CancellationException("Workflow($workflowId) cancelled as requested ...") + throw CancellationException("Workflow($workflowId) cancelled as requested") } - val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { - - /** Send message from workflow actor to node actor */ - fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { - nodeActor.send(nodeMessage) - } - + fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { + + val nodeActor = nodeActor() // Prepare Workflow and Populate the Initial store initializeWorkflow(workflowExecuteMessage.input) @@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP // Prepare first node message and Send NodeExecuteMessage // Start node doesn't wait for any nodes, so we can pass Execute message directly val nodeExecuteMessage = prepareNodeExecutionMessage(startNode) - sendNodeMessage(nodeExecuteMessage) - log.debug("First node triggered successfully, waiting for response") - + /** Send message from workflow actor to node actor */ + launch { + nodeActor.send(nodeExecuteMessage) + } // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.debug("End Node Completed, processing completion message") - val workflowOutput = prepareWorkflowOutput() + log.info("End Node Completed, processing completion message") + val bluePrintProcessorException: BluePrintProcessorException? = + if (exception != null) BluePrintProcessorException(exception) else null + + val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) workflowExecuteMessage.output.complete(workflowOutput) channel.close(exception) } @@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } - private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { @@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } } - fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch { + suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) { val node = message.node node.status = NodeStatus.EXECUTING val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME @@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } else { log.debug("##### Processing workflow($workflowId) node($node) #####") // Call the Extension function and get the next Edge state. - val deferredNodeState = CompletableDeferred<EdgeLabel>() - executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - deferredNodeState.await() + executeNode(node, message.nodeInput, message.nodeOutput) } // Update Node Completed node.status = NodeStatus.EXECUTED @@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } } - fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch { + suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) { val node = message.node val incomingEdges = graph.incomingEdges(node.id) // Check All Incoming Nodes Skipped @@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP if (nonSkippedEdges.isEmpty()) { log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function - val deferredNodeState = CompletableDeferred<EdgeLabel>() - skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - val nodeState = deferredNodeState.await() + val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) log.info("Skip Node($node) -> Executed State($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED @@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP when (nodeMessage) { is NodeReadyMessage<In, Out> -> { // Blocking call - readyNodeWorker(nodeMessage) + try { + readyNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } is NodeExecuteMessage<In, Out> -> { launch { - executeNodeWorker(nodeMessage) + try { + executeNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeSkipMessage<In, Out> -> { launch { - skipNodeWorker(nodeMessage) + try { + skipNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeRestartMessage<In, Out> -> { launch { - restartNodeWorker(nodeMessage) + try { + restartNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } } @@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP this.graph = graph this.workflowId = bluePrintRuntimeService.id() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor.send(startMessage) + workflowActor().send(startMessage) } }
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index 62cb10851..b8d8cea3e 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.runBlocking import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph import org.onap.ccsdk.cds.controllerblueprints.core.toGraph @@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<String, String> { - val deferredNodeOutput = CompletableDeferred<String>() - val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput) - return nodeExecuteMessage + return NodeExecuteMessage(node, "$node Input", "") } override suspend fun executeNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { // val random = (1..10).random() * 1000 // println("will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) - deferredNodeOutput.complete("$node, Output: $nodeInput output") + return status } - override suspend fun prepareNodeSkipMessage(node: Graph.Node) - : NodeSkipMessage<String, String> { - val deferredNodeOutput = CompletableDeferred<String>() - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput) + override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { + val nodeOutput = "" + val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) return nodeSkipMessage } override suspend fun skipNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) + return status } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override suspend fun restartNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(): String { + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { return "Final Response" } }
\ No newline at end of file |