summaryrefslogtreecommitdiffstats
path: root/ms/controllerblueprints
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-08-15 12:43:41 -0400
committerBrinda Santh Muthuramalingam <brindasanth@in.ibm.com>2019-08-16 14:28:55 +0000
commitad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 (patch)
tree52d4dc8ef4cafb8f3a81a9901b5e7cc6d2bcd903 /ms/controllerblueprints
parent048aad79ece5b709e65155e2d0c8675b7c2c84a2 (diff)
Modify workflow execution service options.
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c Issue-ID: CCSDK-1619 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/controllerblueprints')
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt2
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt92
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt31
3 files changed, 61 insertions, 64 deletions
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
index 06d3421c0..259efbf0b 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
@@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String,
private val author: String,
private val tags: String) {
private var serviceTemplate = ServiceTemplate()
- private lateinit var topologyTemplate: TopologyTemplate
+ private var topologyTemplate: TopologyTemplate? = null
private var metadata: MutableMap<String, String> = hashMapOf()
private var dslDefinitions: MutableMap<String, JsonNode>? = null
private var imports: MutableList<ImportDefinition> = mutableListOf()
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
index 91c2bcbbb..905150213 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
@@ -36,24 +36,20 @@ interface BluePrintWorkFlowService<In, Out> {
suspend fun initializeWorkflow(input: In): EdgeLabel
- suspend fun prepareWorkflowOutput(): Out
+ suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
/** Prepare the message for the Node */
suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
- suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
}
@@ -71,17 +67,13 @@ sealed class NodeMessage<In, Out>
class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
-class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
enum class EdgeAction(val id: String) {
EXECUTE("execute"),
@@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
fun cancel() {
log.info("Received workflow($workflowId) cancel request")
job.cancel()
- throw CancellationException("Workflow($workflowId) cancelled as requested ...")
+ throw CancellationException("Workflow($workflowId) cancelled as requested")
}
- val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
-
- /** Send message from workflow actor to node actor */
- fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
- nodeActor.send(nodeMessage)
- }
-
+ fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Process the workflow execution message */
suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+
+ val nodeActor = nodeActor()
// Prepare Workflow and Populate the Initial store
initializeWorkflow(workflowExecuteMessage.input)
@@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
// Prepare first node message and Send NodeExecuteMessage
// Start node doesn't wait for any nodes, so we can pass Execute message directly
val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
- sendNodeMessage(nodeExecuteMessage)
- log.debug("First node triggered successfully, waiting for response")
-
+ /** Send message from workflow actor to node actor */
+ launch {
+ nodeActor.send(nodeExecuteMessage)
+ }
// Wait for workflow completion or Error
nodeActor.invokeOnClose { exception ->
launch {
- log.debug("End Node Completed, processing completion message")
- val workflowOutput = prepareWorkflowOutput()
+ log.info("End Node Completed, processing completion message")
+ val bluePrintProcessorException: BluePrintProcessorException? =
+ if (exception != null) BluePrintProcessorException(exception) else null
+
+ val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
workflowExecuteMessage.output.complete(workflowOutput)
channel.close(exception)
}
@@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
- private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+ private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Send message to process from one state to other state */
fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
@@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
}
- fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
+ suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
val node = message.node
node.status = NodeStatus.EXECUTING
val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
@@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
} else {
log.debug("##### Processing workflow($workflowId) node($node) #####")
// Call the Extension function and get the next Edge state.
- val deferredNodeState = CompletableDeferred<EdgeLabel>()
- executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
- deferredNodeState.await()
+ executeNode(node, message.nodeInput, message.nodeOutput)
}
// Update Node Completed
node.status = NodeStatus.EXECUTED
@@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
}
- fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
+ suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
val node = message.node
val incomingEdges = graph.incomingEdges(node.id)
// Check All Incoming Nodes Skipped
@@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
if (nonSkippedEdges.isEmpty()) {
log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
// Call the Extension Function
- val deferredNodeState = CompletableDeferred<EdgeLabel>()
- skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
- val nodeState = deferredNodeState.await()
+ val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
log.info("Skip Node($node) -> Executed State($nodeState)")
// Mark the Current node as Skipped
node.status = NodeStatus.SKIPPED
@@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
when (nodeMessage) {
is NodeReadyMessage<In, Out> -> {
// Blocking call
- readyNodeWorker(nodeMessage)
+ try {
+ readyNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
is NodeExecuteMessage<In, Out> -> {
launch {
- executeNodeWorker(nodeMessage)
+ try {
+ executeNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
is NodeSkipMessage<In, Out> -> {
launch {
- skipNodeWorker(nodeMessage)
+ try {
+ skipNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
is NodeRestartMessage<In, Out> -> {
launch {
- restartNodeWorker(nodeMessage)
+ try {
+ restartNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
}
@@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
this.graph = graph
this.workflowId = bluePrintRuntimeService.id()
val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor.send(startMessage)
+ workflowActor().send(startMessage)
}
} \ No newline at end of file
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
index 62cb10851..b8d8cea3e 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.runBlocking
import org.junit.Test
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
@@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService
override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
: NodeExecuteMessage<String, String> {
- val deferredNodeOutput = CompletableDeferred<String>()
- val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput)
- return nodeExecuteMessage
+ return NodeExecuteMessage(node, "$node Input", "")
}
override suspend fun executeNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
// val random = (1..10).random() * 1000
// println("will reply in $random ms")
// kotlinx.coroutines.delay(random.toLong())
val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- deferredNodeStatus.complete(status)
- deferredNodeOutput.complete("$node, Output: $nodeInput output")
+ return status
}
- override suspend fun prepareNodeSkipMessage(node: Graph.Node)
- : NodeSkipMessage<String, String> {
- val deferredNodeOutput = CompletableDeferred<String>()
- val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput)
+ override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> {
+ val nodeOutput = ""
+ val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput)
return nodeSkipMessage
}
override suspend fun skipNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- deferredNodeStatus.complete(status)
+ return status
}
override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override suspend fun restartNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
- override suspend fun prepareWorkflowOutput(): String {
+ override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String {
return "Final Response"
}
} \ No newline at end of file