summaryrefslogtreecommitdiffstats
path: root/ms/controllerblueprints/modules
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-08-13 20:51:40 -0400
committerBrinda Santh Muthuramalingam <brindasanth@in.ibm.com>2019-08-16 14:28:45 +0000
commitd58654ce65f36b9d6ddc3f38c751da26d029ea41 (patch)
tree61a4297a81e903274ab2ec1942b99dc641c4e1a6 /ms/controllerblueprints/modules
parentb01289f38b051b9b40eb12db12b46aec7c3472db (diff)
Add imperative workflow service.
Change-Id: Ic74bb7796244c466a0d8561eed27174fc1a14ebb Issue-ID: CCSDK-1617 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/controllerblueprints/modules')
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt340
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt182
2 files changed, 522 insertions, 0 deletions
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
new file mode 100644
index 000000000..019f31805
--- /dev/null
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
@@ -0,0 +1,340 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.service
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.actor
+import kotlinx.coroutines.channels.consumeEach
+import org.onap.ccsdk.cds.controllerblueprints.core.*
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
+import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
+import kotlin.coroutines.CoroutineContext
+
+interface BluePrintWorkFlowService<In, Out> {
+
+ /** Executes imperative workflow for the bluePrintRuntimeService [bluePrintRuntimeService] and workflow
+ * input [input], response will be retrieve from output [output]*/
+ suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
+ input: In, output: CompletableDeferred<Out>)
+
+ suspend fun initializeWorkflow(input: In): EdgeLabel
+
+ suspend fun prepareWorkflowOutput(): Out
+
+ /** Prepare the message for the Node */
+ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
+
+ suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
+
+ suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+ suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+ suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+ suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+}
+
+/** Workflow Message Types */
+sealed class WorkflowMessage<In, Out>
+
+class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+/** Node Message Types */
+sealed class NodeMessage<In, Out>
+
+class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
+
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+ val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+ val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+ val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+ val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+enum class EdgeAction(val id: String) {
+ EXECUTE("execute"),
+ SKIP("skip")
+}
+
+/** Abstract workflow service implementation */
+abstract class AbstractBluePrintWorkFlowService<In, Out>(private val graph: Graph)
+ : CoroutineScope, BluePrintWorkFlowService<In, Out> {
+
+ private val log = logger(AbstractBluePrintWorkFlowService::class)
+
+ private val job = Job()
+
+ lateinit var workflowId: String
+
+ final override val coroutineContext: CoroutineContext
+ get() = job + CoroutineName("Wf")
+
+ val root = graph.startNodes()
+
+ fun cancel() {
+ log.info("Received workflow($workflowId) cancel request")
+ job.cancel()
+ throw CancellationException("Workflow($workflowId) cancelled as requested ...")
+ }
+
+ val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+
+ /** Send message from workflow actor to node actor */
+ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
+ nodeActor.send(nodeMessage)
+ }
+
+ /** Process the workflow execution message */
+ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+ // Prepare Workflow and Populate the Initial store
+ initializeWorkflow(workflowExecuteMessage.input)
+
+ val startNode = root.first()
+ // Prepare first node message and Send NodeExecuteMessage
+ // Start node doesn't wait for any nodes, so we can pass Execute message directly
+ val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
+ sendNodeMessage(nodeExecuteMessage)
+ log.debug("First node triggered successfully, waiting for response")
+
+ // Wait for workflow completion or Error
+ nodeActor.invokeOnClose { exception ->
+ launch {
+ log.debug("End Node Completed, processing completion message")
+ val workflowOutput = prepareWorkflowOutput()
+ workflowExecuteMessage.output.complete(workflowOutput)
+ channel.close(exception)
+ }
+ }
+ }
+
+ /** Process each actor message received based on type */
+ consumeEach { message ->
+ when (message) {
+ is WorkflowExecuteMessage<In, Out> -> {
+ launch {
+ executeMessageActor(message)
+ }
+ }
+ is WorkflowRestartMessage<In, Out> -> {
+ launch {
+ TODO("")
+ }
+ }
+ is WorkflowCancelMessage<In, Out> -> {
+ launch {
+ TODO("")
+ }
+ }
+ }
+ }
+ }
+
+
+ private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+
+ /** Send message to process from one state to other state */
+ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
+ channel.send(nodeMessage)
+ }
+
+ /** Process the cascade node processing, based on the previous state of the node */
+ fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
+ // Process only Next Success Node
+ val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
+ log.debug("Next Edges :$stateEdges")
+ if (stateEdges.isNotEmpty()) {
+ stateEdges.forEach { stateEdge ->
+ // Prepare next node ready message and Send NodeReadyMessage
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ }
+ }
+
+ suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
+ val edge = message.fromEdge
+ val node = edge.target
+ // Check if current edge action is Skip or Execute
+ when (message.edgeAction) {
+ EdgeAction.SKIP -> {
+ val skipMessage = prepareNodeSkipMessage(node)
+ sendNodeMessage(skipMessage)
+ }
+ EdgeAction.EXECUTE -> {
+ val nodeExecuteMessage = prepareNodeExecutionMessage(node)
+ sendNodeMessage(nodeExecuteMessage)
+ }
+ }
+ }
+
+ suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
+ val edge = message.fromEdge
+ val node = edge.target
+ log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
+ // Update the current incoming edge status to executed or skipped
+ when (message.edgeAction) {
+ EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
+ EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
+ }
+ val incomingEdges = graph.incomingEdges(node.id)
+ if (incomingEdges.size > 1) {
+ // Check all incoming edges executed or skipped
+ val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
+ if (notCompletedEdges.isEmpty()) {
+ // Possibility of skip edge action performed at last, but other edges have execute action.
+ val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
+ val newMessage = if (executePresent.isNotEmpty()) {
+ NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
+ } else {
+ message
+ }
+ triggerToExecuteOrSkip(newMessage)
+ } else {
+ log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
+ }
+ } else {
+ triggerToExecuteOrSkip(message)
+ }
+ }
+
+ fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
+ val node = message.node
+ node.status = NodeStatus.EXECUTING
+ val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
+ || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
+ EdgeLabel.SUCCESS
+ } else {
+ log.debug("##### Processing workflow($workflowId) node($node) #####")
+ // Call the Extension function and get the next Edge state.
+ val deferredNodeState = CompletableDeferred<EdgeLabel>()
+ executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
+ deferredNodeState.await()
+ }
+ // Update Node Completed
+ node.status = NodeStatus.EXECUTED
+ log.info("Execute Node($node) -> Executed State($nodeState)")
+
+ // If End Node, Send End Message
+ if (graph.isEndNode(node)) {
+ // Close the current channel
+ channel.close()
+ } else {
+ val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
+ log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
+ // Process Skip Edges
+ skippingEdges.forEach { skippingEdge ->
+ // Prepare next node ready message and Send NodeReadyMessage
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ // Process Success Node
+ processNextNodes(node, nodeState)
+ }
+ }
+
+ fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
+ val node = message.node
+ val incomingEdges = graph.incomingEdges(node.id)
+ // Check All Incoming Nodes Skipped
+ val nonSkippedEdges = incomingEdges.filter {
+ it.status == EdgeStatus.NOT_STARTED
+ }
+ log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
+
+ if (nonSkippedEdges.isEmpty()) {
+ log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
+ // Call the Extension Function
+ val deferredNodeState = CompletableDeferred<EdgeLabel>()
+ skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
+ val nodeState = deferredNodeState.await()
+ log.info("Skip Node($node) -> Executed State($nodeState)")
+ // Mark the Current node as Skipped
+ node.status = NodeStatus.SKIPPED
+ // Look for next possible skip nodes
+ graph.outgoingEdges(node.id).forEach { outgoingEdge ->
+ val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
+ sendNodeMessage(nodeReadyMessage)
+ }
+ }
+ }
+
+ fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
+ TODO()
+ }
+
+ fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
+ channel.close()
+ throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
+ }
+
+ /** Process each actor message received based on type **/
+ consumeEach { nodeMessage ->
+ when (nodeMessage) {
+ is NodeReadyMessage<In, Out> -> {
+ // Blocking call
+ readyNodeWorker(nodeMessage)
+ }
+ is NodeExecuteMessage<In, Out> -> {
+ launch {
+ executeNodeWorker(nodeMessage)
+ }
+ }
+ is NodeSkipMessage<In, Out> -> {
+ launch {
+ skipNodeWorker(nodeMessage)
+ }
+ }
+ is NodeRestartMessage<In, Out> -> {
+ launch {
+ restartNodeWorker(nodeMessage)
+ }
+ }
+ }
+ }
+ }
+
+
+ override suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In, output: CompletableDeferred<Out>) {
+ log.info("Executing Graph : $graph")
+ this.workflowId = bluePrintRuntimeService.id()
+ validateWorkflow()
+ val startMessage = WorkflowExecuteMessage(input, output)
+ workflowActor.send(startMessage)
+ }
+
+ open fun validateWorkflow() {
+ //check(!graph.findCycles().isNotEmpty()) { "Graph is cyclic, Cycle is not supported" }
+ }
+} \ No newline at end of file
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
new file mode 100644
index 000000000..7cb64922c
--- /dev/null
+++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
@@ -0,0 +1,182 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.service
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
+import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
+import kotlin.test.assertNotNull
+
+class BluePrintWorkflowServiceTest {
+ @Test
+ fun testSimpleFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>E/SUCCESS, E>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+ val deferredOutput = CompletableDeferred<String>()
+ val input = "123456"
+ simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+ val response = deferredOutput.await()
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testConditionalFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+ val deferredOutput = CompletableDeferred<String>()
+ val input = "123456"
+ simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+ val response = deferredOutput.await()
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testBothConditionalFlow() {
+ runBlocking {
+ // Failure Flow
+ val failurePatGraph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val failurePathWorkflow = TestBluePrintWorkFlowService(failurePatGraph)
+ failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"),
+ arrayListOf("A"))
+ val failurePathWorkflowDeferredOutput = CompletableDeferred<String>()
+ val failurePathWorkflowInput = "123456"
+ failurePathWorkflow.executeWorkflow(mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput)
+ val failurePathResponse = failurePathWorkflowDeferredOutput.await()
+ assertNotNull(failurePathResponse, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testMultipleSkipFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, C>D/SUCCESS, D>E/SUCCESS, B>E/SUCCESS, E>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+ val deferredOutput = CompletableDeferred<String>()
+ val input = "123456"
+ simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+ val response = deferredOutput.await()
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testParallelFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/SUCCESS, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
+ val deferredOutput = CompletableDeferred<String>()
+ val input = "123456"
+ simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+ val response = deferredOutput.await()
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> {
+ val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>()
+ every { bluePrintRuntimeService.id() } returns "123456"
+ return bluePrintRuntimeService
+ }
+
+ private fun prepareSimulation(successes: List<String>?, failures: List<String>?): MutableMap<String, EdgeLabel> {
+ val simulatedState: MutableMap<String, EdgeLabel> = hashMapOf()
+ successes?.forEach {
+ simulatedState[it] = EdgeLabel.SUCCESS
+ }
+ failures?.forEach {
+ simulatedState[it] = EdgeLabel.FAILURE
+ }
+ return simulatedState
+ }
+}
+
+class TestBluePrintWorkFlowService(graph: Graph)
+ : AbstractBluePrintWorkFlowService<String, String>(graph) {
+
+ lateinit var simulatedState: MutableMap<String, EdgeLabel>
+
+ override suspend fun initializeWorkflow(input: String): EdgeLabel {
+ return EdgeLabel.SUCCESS
+ }
+
+ override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
+ : NodeExecuteMessage<String, String> {
+ val deferredNodeOutput = CompletableDeferred<String>()
+ val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput)
+ return nodeExecuteMessage
+ }
+
+ override suspend fun executeNode(node: Graph.Node, nodeInput: String,
+ deferredNodeOutput: CompletableDeferred<String>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+// val random = (1..10).random() * 1000
+// println("will reply in $random ms")
+// kotlinx.coroutines.delay(random.toLong())
+ val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
+ deferredNodeStatus.complete(status)
+ deferredNodeOutput.complete("$node, Output: $nodeInput output")
+ }
+
+ override suspend fun prepareNodeSkipMessage(node: Graph.Node)
+ : NodeSkipMessage<String, String> {
+ val deferredNodeOutput = CompletableDeferred<String>()
+ val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput)
+ return nodeSkipMessage
+ }
+
+ override suspend fun skipNode(node: Graph.Node, nodeInput: String,
+ deferredNodeOutput: CompletableDeferred<String>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
+ deferredNodeStatus.complete(status)
+ }
+
+ override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
+ deferredNodeOutput: CompletableDeferred<String>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override suspend fun restartNode(node: Graph.Node, nodeInput: String,
+ deferredNodeOutput: CompletableDeferred<String>,
+ deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ }
+
+ override suspend fun prepareWorkflowOutput(): String {
+ return "Final Response"
+ }
+} \ No newline at end of file