diff options
Diffstat (limited to 'ms/controllerblueprints')
15 files changed, 1166 insertions, 255 deletions
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index 67a215ef5..064c196ed 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -161,6 +161,9 @@ object BluePrintConstants { const val TOSCA_SCRIPTS_KOTLIN_DIR: String = "$TOSCA_SCRIPTS_DIR/kotlin" const val TOSCA_SCRIPTS_JYTHON_DIR: String = "$TOSCA_SCRIPTS_DIR/python" + const val GRAPH_START_NODE_NAME = "START" + const val GRAPH_END_NODE_NAME = "END" + const val PROPERTY_ENV = "ENV" const val PROPERTY_APP = "APP" const val PROPERTY_BPP = "BPP" diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt index c77427b01..93ba15e99 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt @@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.JsonParserUtils import org.slf4j.LoggerFactory import org.slf4j.helpers.MessageFormatter -import java.lang.Float import kotlin.reflect.KClass /** @@ -98,10 +97,10 @@ fun <T : Any?> T.asJsonPrimitive(): JsonNode { fun String.asJsonType(bpDataType: String): JsonNode { return when (bpDataType.toLowerCase()) { BluePrintConstants.DATA_TYPE_STRING -> this.asJsonPrimitive() - BluePrintConstants.DATA_TYPE_BOOLEAN -> java.lang.Boolean.valueOf(this).asJsonPrimitive() - BluePrintConstants.DATA_TYPE_INTEGER -> Integer.valueOf(this).asJsonPrimitive() - BluePrintConstants.DATA_TYPE_FLOAT -> Float.valueOf(this).asJsonPrimitive() - BluePrintConstants.DATA_TYPE_DOUBLE -> java.lang.Double.valueOf(this).asJsonPrimitive() + BluePrintConstants.DATA_TYPE_BOOLEAN -> this.toBoolean().asJsonPrimitive() + BluePrintConstants.DATA_TYPE_INTEGER -> this.toInt().asJsonPrimitive() + BluePrintConstants.DATA_TYPE_FLOAT -> this.toFloat().asJsonPrimitive() + BluePrintConstants.DATA_TYPE_DOUBLE -> this.toDouble().asJsonPrimitive() // For List, Map and Complex Types. else -> this.jsonAsJsonType() } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctions.kt new file mode 100644 index 000000000..793bdc455 --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctions.kt @@ -0,0 +1,116 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core + +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.data.Workflow +import org.onap.ccsdk.cds.controllerblueprints.core.utils.WorkflowGraphUtils +import java.util.regex.Pattern + +private val graphTokenSeparators = Pattern.compile("[->/]") + +/** Convert Blueprint workflow to graph data structure */ +fun Workflow.asGraph(): Graph { + return WorkflowGraphUtils.workFlowToGraph(this) +} + +fun String.toGraph(): Graph { + if (!startsWith('[') || !endsWith(']')) { + throw IllegalArgumentException("Expected string starting '[' and ending with ']' but it was '$") + } + val tokens = substring(1, length - 1).split(", ").map { it.split(graphTokenSeparators) } + val nodes = tokens.flatMap { it.take(2) }.toCollection(LinkedHashSet()) + val edges = tokens.filter { it.size == 3 }.map { Graph.TermForm.Term(it[0], it[1], EdgeLabel.valueOf(it[2])) } + return Graph.labeledDirectedTerms(Graph.TermForm(nodes, edges)) +} + +fun Graph.toAdjacencyList(): Graph.AdjacencyList<String, EdgeLabel> { + val entries = nodes.values.map { node -> + val links = node.edges.map { Graph.AdjacencyList.Link(it.target(node).id, it.label) } + Graph.AdjacencyList.Entry(node = node.id, links = links) + } + return Graph.AdjacencyList(entries) +} + +fun Graph.findAllPaths(from: String, to: String, path: List<String> = emptyList()): List<List<String>> { + if (from == to) return listOf(path + to) + return nodes[from]!!.neighbors() + .filter { !path.contains(it.id) } + .flatMap { findAllPaths(it.id, to, path + from) } +} + +fun Graph.findCycles(node: String): List<List<String>> { + fun findCycles(path: List<String>): List<List<String>> { + if (path.size > 3 && path.first() == path.last()) return listOf(path) + return nodes[path.last()]!!.neighbors() + .filterNot { path.tail().contains(it.id) } + .flatMap { findCycles(path + it.id) } + } + return findCycles(listOf(node)) +} + +fun Graph.startNodes() = this.nodes.values.filter { + val incomingEdges = incomingEdges(it.id) + incomingEdges.isEmpty() +} + +fun Graph.endNodes(): Set<Graph.Node> = this.nodes.values.filter { + outgoingEdges(it.id).isEmpty() +}.toSet() + +fun Graph.node(node: String) = this.nodes[node] + +fun Graph.edge(label: EdgeLabel) = + this.edges.filter { it.label == label } + +fun Graph.incomingEdges(node: String) = + this.edges.filter { it.target.id == node } + +fun Graph.incomingNodes(node: String) = + this.incomingEdges(node).map { it.source } + +fun Graph.outgoingEdges(node: String) = + this.edges.filter { it.source.id == node } + +fun Graph.outgoingNodes(node: String) = + this.outgoingEdges(node).map { it.target } + +fun Graph.outgoingEdges(node: String, label: EdgeLabel) = + this.edges.filter { it.source.id == node && it.label == label } + +fun Graph.outgoingNodes(node: String, label: EdgeLabel) = + this.outgoingEdges(node, label).map { it.target } + +fun Graph.outgoingNodesNotInEdgeLabels(node: String, labels: List<EdgeLabel>) = + this.outgoingEdgesNotInLabels(node, labels).map { it.target } + +fun Graph.outgoingEdges(node: String, labels: List<EdgeLabel>) = + this.edges.filter { it.source.id == node && labels.contains(it.label) } + +fun Graph.outgoingEdgesNotInLabels(node: String, labels: List<EdgeLabel>) = + this.edges.filter { it.source.id == node && !labels.contains(it.label) } + +fun Graph.outgoingNodes(node: String, labels: List<EdgeLabel>) = + this.outgoingEdges(node, labels).map { it.target } + +fun Graph.isEndNode(node: Graph.Node): Boolean { + return this.endNodes().contains(node) +} + +fun <T> List<T>.tail(): List<T> = drop(1) + diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt new file mode 100644 index 000000000..9e1b7498e --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt @@ -0,0 +1,174 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.data + +enum class EdgeLabel(val id: String) { + SUCCESS("success"), + FAILURE("failure"), + DEFAULT("*") +} + +enum class EdgeStatus(val id: String) { + NOT_STARTED("not_started"), + EXECUTED("executed"), + SKIPPED("skipped") +} + +enum class NodeStatus(val id: String) { + NOT_STARTED("not_started"), + READY("ready"), + EXECUTING("executing"), + EXECUTED("executed"), + SKIPPED("skipped") +} + +class Graph { + val nodes: MutableMap<String, Node> = hashMapOf() + val edges: MutableSet<Edge> = mutableSetOf() + + fun addNode(value: String): Node { + val node = Node(value) + nodes[value] = node + return node + } + + fun addEdge(source: String, destination: String, label: EdgeLabel) { + if (!nodes.containsKey(source)) { + addNode(source) + } + if (!nodes.containsKey(destination)) { + addNode(destination) + } + val edge = Edge(nodes[source]!!, nodes[destination]!!, label) + if (!edges.contains(edge)) { + edges.add(edge) + nodes[source]!!.edges.add(edge) + } + } + + override fun toString(): String { + val standaloneNodes = nodes.values.filter { node -> edges.all { it.source != node && it.target != node } } + val s = (edges.map { it.toString() } + standaloneNodes.map { it.toString() }).joinToString() + return "[$s]" + } + + fun print(): String { + val buffer = StringBuffer("Nodes :") + nodes.values.forEach { + buffer.append("\n\t$it") + } + buffer.append("\nEdges :") + edges.forEach { + buffer.append("\n\t$it") + } + return buffer.toString() + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other?.javaClass != javaClass) return false + other as Graph + return nodes == other.nodes && edges == other.edges + } + + override fun hashCode() = 31 * nodes.hashCode() + edges.hashCode() + + fun equivalentTo(other: Graph): Boolean { + return nodes == other.nodes && edges.all { edge -> other.edges.any { it.equivalentTo(edge) } } + } + + data class Node(val id: String, var status: NodeStatus = NodeStatus.NOT_STARTED) { + val edges: MutableList<Edge> = ArrayList() + + fun neighbors(): List<Node> = edges.map { edge -> edge.target(this) } + + fun neighbors(label: EdgeLabel): List<Node> = edges.filter { it.label == label } + .map { edge -> edge.target(this) } + + fun labelEdges(label: EdgeLabel): List<Edge> = edges.filter { it.label == label } + + override fun toString() = "$id, Status($status)" + } + + data class Edge( + val source: Node, + val target: Node, + val label: EdgeLabel, + var status: EdgeStatus = EdgeStatus.NOT_STARTED) { + + fun target(node: Node): Node = target + + fun equivalentTo(other: Edge) = + (source == other.source && target == other.target) + || (source == other.target && target == other.source) + + override fun toString() = + "${source.id}>${target.id}/$label($status)" + } + + data class TermForm(val nodes: Collection<String>, val edges: List<Term>) { + + data class Term(val source: String, val target: String, val label: EdgeLabel) { + override fun toString() = "Term($source, $target, $label)" + } + } + + data class AdjacencyList<String, out EdgeLabel>(val entries: List<Entry<String, EdgeLabel>>) { + constructor(vararg entries: Entry<String, EdgeLabel>) : this(entries.asList()) + + override fun toString() = "AdjacencyList(${entries.joinToString()})" + + data class Entry<out String, out EdgeLabel>(val node: String, val links: List<Link<String, EdgeLabel>> = emptyList<Nothing>()) { + constructor(node: String, vararg links: Link<String, EdgeLabel>) : this(node, links.asList()) + + override fun toString() = "Entry($node, links[${links.joinToString()}])" + } + + data class Link<out String, out EdgeLabel>(val node: String, val label: EdgeLabel) { + override fun toString() = if (label == null) "$node" else "$node/$label" + } + } + + companion object { + + fun labeledDirectedTerms(termForm: TermForm): Graph = + createFromTerms(termForm) { graph, n1, n2, value -> graph.addEdge(n1, n2, value) } + + fun labeledDirectedAdjacent(adjacencyList: AdjacencyList<String, EdgeLabel>): Graph = + fromAdjacencyList(adjacencyList) { graph, n1, n2, value -> + graph.addEdge(n1, n2, value) + } + + private fun createFromTerms(termForm: TermForm, + addFunction: (Graph, String, String, EdgeLabel) -> Unit): Graph { + val graph = Graph() + termForm.nodes.forEach { graph.addNode(it) } + termForm.edges.forEach { addFunction(graph, it.source, it.target, it.label) } + return graph + } + + private fun fromAdjacencyList(adjacencyList: AdjacencyList<String, EdgeLabel>, + addFunction: (Graph, String, String, EdgeLabel) -> Unit): Graph { + val graph = Graph() + adjacencyList.entries.forEach { graph.addNode(it.node) } + adjacencyList.entries.forEach { (node, links) -> + links.forEach { addFunction(graph, node, it.node, it.label) } + } + return graph + } + } +}
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt index 06d3421c0..259efbf0b 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt @@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String, private val author: String, private val tags: String) { private var serviceTemplate = ServiceTemplate() - private lateinit var topologyTemplate: TopologyTemplate + private var topologyTemplate: TopologyTemplate? = null private var metadata: MutableMap<String, String> = hashMapOf() private var dslDefinitions: MutableMap<String, JsonNode>? = null private var imports: MutableList<ImportDefinition> = mutableListOf() diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt new file mode 100644 index 000000000..905150213 --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -0,0 +1,339 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.service + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.actor +import kotlinx.coroutines.channels.consumeEach +import org.onap.ccsdk.cds.controllerblueprints.core.* +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus +import kotlin.coroutines.CoroutineContext + +interface BluePrintWorkFlowService<In, Out> { + + /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService] + * and workflow input [input], response will be retrieve from output [output]*/ + suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, + input: In, output: CompletableDeferred<Out>) + + suspend fun initializeWorkflow(input: In): EdgeLabel + + suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out + + /** Prepare the message for the Node */ + suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> + + suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out> + + suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel + + suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel + + suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel + + suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel + +} + +/** Workflow Message Types */ +sealed class WorkflowMessage<In, Out> + +class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>() + +class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>() + +class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>() + +/** Node Message Types */ +sealed class NodeMessage<In, Out> + +class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>() + +class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() + +class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() + +class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() + +class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() + +enum class EdgeAction(val id: String) { + EXECUTE("execute"), + SKIP("skip") +} + +/** Abstract workflow service implementation */ +abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> { + + lateinit var graph: Graph + + private val log = logger(AbstractBluePrintWorkFlowService::class) + + private val job = Job() + + lateinit var workflowId: String + + final override val coroutineContext: CoroutineContext + get() = job + CoroutineName("Wf") + + fun cancel() { + log.info("Received workflow($workflowId) cancel request") + job.cancel() + throw CancellationException("Workflow($workflowId) cancelled as requested") + } + + fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + /** Process the workflow execution message */ + suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { + + val nodeActor = nodeActor() + // Prepare Workflow and Populate the Initial store + initializeWorkflow(workflowExecuteMessage.input) + + val startNode = graph.startNodes().first() + // Prepare first node message and Send NodeExecuteMessage + // Start node doesn't wait for any nodes, so we can pass Execute message directly + val nodeExecuteMessage = prepareNodeExecutionMessage(startNode) + /** Send message from workflow actor to node actor */ + launch { + nodeActor.send(nodeExecuteMessage) + } + // Wait for workflow completion or Error + nodeActor.invokeOnClose { exception -> + launch { + log.info("End Node Completed, processing completion message") + val bluePrintProcessorException: BluePrintProcessorException? = + if (exception != null) BluePrintProcessorException(exception) else null + + val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) + workflowExecuteMessage.output.complete(workflowOutput) + channel.close(exception) + } + } + } + + /** Process each actor message received based on type */ + consumeEach { message -> + when (message) { + is WorkflowExecuteMessage<In, Out> -> { + launch { + executeMessageActor(message) + } + } + is WorkflowRestartMessage<In, Out> -> { + launch { + TODO("") + } + } + is WorkflowCancelMessage<In, Out> -> { + launch { + TODO("") + } + } + } + } + } + + + private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + + /** Send message to process from one state to other state */ + fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { + channel.send(nodeMessage) + } + + /** Process the cascade node processing, based on the previous state of the node */ + fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) { + // Process only Next Success Node + val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState)) + log.debug("Next Edges :$stateEdges") + if (stateEdges.isNotEmpty()) { + stateEdges.forEach { stateEdge -> + // Prepare next node ready message and Send NodeReadyMessage + val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE) + sendNodeMessage(nodeReadyMessage) + } + } + } + + suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) { + val edge = message.fromEdge + val node = edge.target + // Check if current edge action is Skip or Execute + when (message.edgeAction) { + EdgeAction.SKIP -> { + val skipMessage = prepareNodeSkipMessage(node) + sendNodeMessage(skipMessage) + } + EdgeAction.EXECUTE -> { + val nodeExecuteMessage = prepareNodeExecutionMessage(node) + sendNodeMessage(nodeExecuteMessage) + } + } + } + + suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) { + val edge = message.fromEdge + val node = edge.target + log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@") + // Update the current incoming edge status to executed or skipped + when (message.edgeAction) { + EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED + EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED + } + val incomingEdges = graph.incomingEdges(node.id) + if (incomingEdges.size > 1) { + // Check all incoming edges executed or skipped + val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED } + if (notCompletedEdges.isEmpty()) { + // Possibility of skip edge action performed at last, but other edges have execute action. + val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED } + val newMessage = if (executePresent.isNotEmpty()) { + NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE) + } else { + message + } + triggerToExecuteOrSkip(newMessage) + } else { + log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)") + } + } else { + triggerToExecuteOrSkip(message) + } + } + + suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) { + val node = message.node + node.status = NodeStatus.EXECUTING + val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME + || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) { + EdgeLabel.SUCCESS + } else { + log.debug("##### Processing workflow($workflowId) node($node) #####") + // Call the Extension function and get the next Edge state. + executeNode(node, message.nodeInput, message.nodeOutput) + } + // Update Node Completed + node.status = NodeStatus.EXECUTED + log.info("Execute Node($node) -> Executed State($nodeState)") + + // If End Node, Send End Message + if (graph.isEndNode(node)) { + // Close the current channel + channel.close() + } else { + val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState)) + log.debug("Skipping node($node) outgoing Edges($skippingEdges)") + // Process Skip Edges + skippingEdges.forEach { skippingEdge -> + // Prepare next node ready message and Send NodeReadyMessage + val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP) + sendNodeMessage(nodeReadyMessage) + } + // Process Success Node + processNextNodes(node, nodeState) + } + } + + suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) { + val node = message.node + val incomingEdges = graph.incomingEdges(node.id) + // Check All Incoming Nodes Skipped + val nonSkippedEdges = incomingEdges.filter { + it.status == EdgeStatus.NOT_STARTED + } + log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)") + + if (nonSkippedEdges.isEmpty()) { + log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") + // Call the Extension Function + val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) + log.info("Skip Node($node) -> Executed State($nodeState)") + // Mark the Current node as Skipped + node.status = NodeStatus.SKIPPED + // Look for next possible skip nodes + graph.outgoingEdges(node.id).forEach { outgoingEdge -> + val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP) + sendNodeMessage(nodeReadyMessage) + } + } + } + + fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch { + TODO() + } + + fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch { + channel.close() + throw CancellationException("Workflow($workflowId) actor cancelled as requested ...") + } + + /** Process each actor message received based on type **/ + consumeEach { nodeMessage -> + when (nodeMessage) { + is NodeReadyMessage<In, Out> -> { + // Blocking call + try { + readyNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } + } + is NodeExecuteMessage<In, Out> -> { + launch { + try { + executeNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } + } + } + is NodeSkipMessage<In, Out> -> { + launch { + try { + skipNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } + } + } + is NodeRestartMessage<In, Out> -> { + launch { + try { + restartNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } + } + } + } + } + } + + override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, + input: In, output: CompletableDeferred<Out>) { + log.info("Executing Graph : $graph") + this.graph = graph + this.workflowId = bluePrintRuntimeService.id() + val startMessage = WorkflowExecuteMessage(input, output) + workflowActor().send(startMessage) + } +}
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/BluePrintMetadataUtils.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/BluePrintMetadataUtils.kt index 669ab3fef..55424ada8 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/BluePrintMetadataUtils.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/BluePrintMetadataUtils.kt @@ -61,7 +61,7 @@ class BluePrintMetadataUtils { // Verify if the environment directory exists if (envDir.exists() && envDir.isDirectory) { //Find all available environment files - envDir.listFiles() + envDir.listFiles()!! .filter { it.name.endsWith(".properties") } .forEach { val istream = it.inputStream() @@ -96,14 +96,20 @@ class BluePrintMetadataUtils { return toscaMetaData } - fun getBluePrintRuntime(id: String, blueprintBasePath: String): BluePrintRuntimeService<MutableMap<String, JsonNode>> { - + fun getBluePrintRuntime(id: String, blueprintBasePath: String) + : BluePrintRuntimeService<MutableMap<String, JsonNode>> { val bluePrintContext: BluePrintContext = getBluePrintContext(blueprintBasePath) + return getBluePrintRuntime(id, bluePrintContext) + } + fun getBluePrintRuntime(id: String, bluePrintContext: BluePrintContext) + : BluePrintRuntimeService<MutableMap<String, JsonNode>> { + checkNotEmpty(bluePrintContext.rootPath) { "blueprint context root path is missing." } + checkNotEmpty(bluePrintContext.entryDefinition) { "blueprint context entry definition is missing." } + val blueprintBasePath = bluePrintContext.rootPath val bluePrintRuntimeService = DefaultBluePrintRuntimeService(id, bluePrintContext) bluePrintRuntimeService.put(BluePrintConstants.PROPERTY_BLUEPRINT_BASE_PATH, blueprintBasePath.asJsonPrimitive()) bluePrintRuntimeService.put(BluePrintConstants.PROPERTY_BLUEPRINT_PROCESS_ID, id.asJsonPrimitive()) - return bluePrintRuntimeService } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt index 768f8753f..73dff9379 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/JacksonUtils.kt @@ -236,51 +236,55 @@ class JacksonUtils { } } - fun populatePrimitiveValues(key: String, value: Any, primitiveType: String, objectNode: ObjectNode) { + fun populatePrimitiveValues(key: String, value: JsonNode, primitiveType: String, objectNode: ObjectNode) { when (primitiveType.toLowerCase()) { - BluePrintConstants.DATA_TYPE_STRING, BluePrintConstants.DATA_TYPE_BOOLEAN, BluePrintConstants.DATA_TYPE_INTEGER, BluePrintConstants.DATA_TYPE_FLOAT, BluePrintConstants.DATA_TYPE_DOUBLE, - BluePrintConstants.DATA_TYPE_TIMESTAMP -> - objectNode.set(key, value.asJsonPrimitive()) - else -> objectNode.set(key, value.asJsonType()) + BluePrintConstants.DATA_TYPE_TIMESTAMP, + BluePrintConstants.DATA_TYPE_STRING -> + objectNode.set(key, value) + else -> throw BluePrintException("populatePrimitiveValues expected only primitive values! Received: ($value)") } } - fun populatePrimitiveValues(value: Any, primitiveType: String, arrayNode: ArrayNode) { + fun populatePrimitiveValues(value: JsonNode, primitiveType: String, arrayNode: ArrayNode) { when (primitiveType.toLowerCase()) { - BluePrintConstants.DATA_TYPE_BOOLEAN -> arrayNode.add(value as Boolean) - BluePrintConstants.DATA_TYPE_INTEGER -> arrayNode.add(value as Int) - BluePrintConstants.DATA_TYPE_FLOAT -> arrayNode.add(value as Float) - BluePrintConstants.DATA_TYPE_DOUBLE -> arrayNode.add(value as Double) - BluePrintConstants.DATA_TYPE_TIMESTAMP -> arrayNode.add(value as String) - else -> arrayNode.add(value as String) + BluePrintConstants.DATA_TYPE_BOOLEAN, + BluePrintConstants.DATA_TYPE_INTEGER, + BluePrintConstants.DATA_TYPE_FLOAT, + BluePrintConstants.DATA_TYPE_DOUBLE, + BluePrintConstants.DATA_TYPE_TIMESTAMP, + BluePrintConstants.DATA_TYPE_STRING -> arrayNode.add(value) + else -> throw BluePrintException("populatePrimitiveValues expected only primitive values! Received: ($value)") } } fun populatePrimitiveDefaultValues(key: String, primitiveType: String, objectNode: ObjectNode) { - when (primitiveType.toLowerCase()) { - BluePrintConstants.DATA_TYPE_BOOLEAN -> objectNode.put(key, false) - BluePrintConstants.DATA_TYPE_INTEGER -> objectNode.put(key, 0) - BluePrintConstants.DATA_TYPE_FLOAT -> objectNode.put(key, 0.0) - BluePrintConstants.DATA_TYPE_DOUBLE -> objectNode.put(key, 0.0) - else -> objectNode.put(key, "") - } + val defaultValue = getDefaultValueOfPrimitiveAsJsonNode(primitiveType) ?: + throw BluePrintException("populatePrimitiveDefaultValues expected only primitive values! Received type ($primitiveType)") + objectNode.set(key, defaultValue) } fun populatePrimitiveDefaultValuesForArrayNode(primitiveType: String, arrayNode: ArrayNode) { - when (primitiveType.toLowerCase()) { - BluePrintConstants.DATA_TYPE_BOOLEAN -> arrayNode.add(false) - BluePrintConstants.DATA_TYPE_INTEGER -> arrayNode.add(0) - BluePrintConstants.DATA_TYPE_FLOAT -> arrayNode.add(0.0) - BluePrintConstants.DATA_TYPE_DOUBLE -> arrayNode.add(0.0) - else -> arrayNode.add("") + val defaultValue = getDefaultValueOfPrimitiveAsJsonNode(primitiveType) ?: + throw BluePrintException("populatePrimitiveDefaultValuesForArrayNode expected only primitive values! Received type ($primitiveType)") + arrayNode.add(defaultValue) + } + + private fun getDefaultValueOfPrimitiveAsJsonNode(primitiveType: String): JsonNode? { + return when (primitiveType.toLowerCase()) { + BluePrintConstants.DATA_TYPE_BOOLEAN -> BooleanNode.valueOf(false) + BluePrintConstants.DATA_TYPE_INTEGER -> IntNode.valueOf(0) + BluePrintConstants.DATA_TYPE_FLOAT -> FloatNode.valueOf(0.0f) + BluePrintConstants.DATA_TYPE_DOUBLE -> DoubleNode.valueOf(0.0) + BluePrintConstants.DATA_TYPE_STRING -> MissingNode.getInstance() + else -> null } } - fun populateJsonNodeValues(key: String, nodeValue: JsonNode?, type: String, objectNode: ObjectNode) { + fun populateJsonNodeValues(key: String, nodeValue: JsonNode, type: String, objectNode: ObjectNode) { if (nodeValue == null || nodeValue is NullNode) { objectNode.set(key, nodeValue) } else if (BluePrintTypes.validPrimitiveTypes().contains(type)) { @@ -292,12 +296,13 @@ class JacksonUtils { fun convertPrimitiveResourceValue(type: String, value: String): JsonNode { return when (type.toLowerCase()) { - BluePrintConstants.DATA_TYPE_BOOLEAN -> jsonNodeFromObject(java.lang.Boolean.valueOf(value)) - BluePrintConstants.DATA_TYPE_INTEGER -> jsonNodeFromObject(Integer.valueOf(value)) - BluePrintConstants.DATA_TYPE_FLOAT -> jsonNodeFromObject(java.lang.Float.valueOf(value)) - BluePrintConstants.DATA_TYPE_DOUBLE -> jsonNodeFromObject(java.lang.Double.valueOf(value)) + BluePrintConstants.DATA_TYPE_BOOLEAN -> jsonNodeFromObject(value.toBoolean()) + BluePrintConstants.DATA_TYPE_INTEGER -> jsonNodeFromObject(value.toInt()) + BluePrintConstants.DATA_TYPE_FLOAT -> jsonNodeFromObject(value.toFloat()) + BluePrintConstants.DATA_TYPE_DOUBLE -> jsonNodeFromObject(value.toDouble()) + BluePrintConstants.DATA_TYPE_STRING -> jsonNodeFromObject(value) else -> getJsonNode(value) } } } -}
\ No newline at end of file +} diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtils.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtils.kt new file mode 100644 index 000000000..ef765ab86 --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtils.kt @@ -0,0 +1,46 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.utils + +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.data.Workflow +import org.onap.ccsdk.cds.controllerblueprints.core.endNodes +import org.onap.ccsdk.cds.controllerblueprints.core.startNodes + +object WorkflowGraphUtils { + + fun workFlowToGraph(workflow: Workflow): Graph { + val graph = Graph() + workflow.steps?.forEach { (stepName, step) -> + step.onSuccess?.forEach { successTarget -> + graph.addEdge(stepName, successTarget, EdgeLabel.SUCCESS) + } + step.onFailure?.forEach { failureTarget -> + graph.addEdge(stepName, failureTarget, EdgeLabel.FAILURE) + } + } + graph.startNodes().forEach { rootNode -> + graph.addEdge(BluePrintConstants.GRAPH_START_NODE_NAME, rootNode.id, EdgeLabel.SUCCESS) + } + graph.endNodes().forEach { endNode -> + graph.addEdge(endNode.id, BluePrintConstants.GRAPH_END_NODE_NAME, EdgeLabel.SUCCESS) + } + return graph + } +}
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctionsTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctionsTest.kt new file mode 100644 index 000000000..86cb473ae --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/GraphExtensionFunctionsTest.kt @@ -0,0 +1,37 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core + +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import kotlin.test.assertNotNull + +class GraphExtensionFunctionsTest { + + @Test + fun testGraph() { + val graph = "[p>q/SUCCESS, m>q/SUCCESS, k, p>m/FAILURE, o>p/SUCCESS]".toGraph() + assertNotNull(graph, "failed to create graph") + assertNotNull(graph.toAdjacencyList(), "failed to adjacency list from graph") + + val neighbors = graph.nodes["p"]!!.neighbors() + assertNotNull(neighbors, "failed to neighbors from graph for 'p' node") + + val nodePath = graph.nodes["p"]!!.neighbors(EdgeLabel.SUCCESS) + assertNotNull(nodePath, "failed to nodePath from graph for 'p' node 'SUCCESS' label") + } +} diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintRuntimeServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintRuntimeServiceTest.kt index 4c207fbe1..9103af3fa 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintRuntimeServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintRuntimeServiceTest.kt @@ -17,16 +17,17 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service -import org.slf4j.LoggerFactory import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.NullNode import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.data.PropertyDefinition +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintRuntimeUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.slf4j.LoggerFactory import kotlin.test.assertEquals import kotlin.test.assertNotNull @@ -36,7 +37,7 @@ import kotlin.test.assertNotNull * @author Brinda Santh */ class BluePrintRuntimeServiceTest { - private val log= LoggerFactory.getLogger(this::class.toString()) + private val log = LoggerFactory.getLogger(this::class.toString()) @Test fun `test Resolve NodeTemplate Properties`() { @@ -167,11 +168,15 @@ class BluePrintRuntimeServiceTest { } private fun getBluePrintRuntimeService(): BluePrintRuntimeService<MutableMap<String, JsonNode>> { - val blueprintBasePath: String = ("./../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") + val blueprintBasePath = normalizedPathName("./../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") val blueprintRuntime = BluePrintMetadataUtils.getBluePrintRuntime("1234", blueprintBasePath) + val checkProcessId = blueprintRuntime.get(BluePrintConstants.PROPERTY_BLUEPRINT_PROCESS_ID) val checkBasePath = blueprintRuntime.get(BluePrintConstants.PROPERTY_BLUEPRINT_BASE_PATH) - assertEquals(blueprintBasePath.asJsonPrimitive(), checkBasePath, "Failed to get base path after runtime creation") + assertEquals("1234".asJsonPrimitive(), + checkProcessId, "Failed to get process id after runtime creation") + assertEquals(blueprintBasePath.asJsonPrimitive(), + checkBasePath, "Failed to get base path after runtime creation") return blueprintRuntime } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt new file mode 100644 index 000000000..b8d8cea3e --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -0,0 +1,175 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.service + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.runBlocking +import org.junit.Test +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel +import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph +import org.onap.ccsdk.cds.controllerblueprints.core.toGraph +import kotlin.test.assertNotNull + +class BluePrintWorkflowServiceTest { + @Test + fun testSimpleFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>E/SUCCESS, E>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred<String>() + val input = "123456" + simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testConditionalFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred<String>() + val input = "123456" + simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testBothConditionalFlow() { + runBlocking { + // Failure Flow + val failurePatGraph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val failurePathWorkflow = TestBluePrintWorkFlowService() + failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"), + arrayListOf("A")) + val failurePathWorkflowDeferredOutput = CompletableDeferred<String>() + val failurePathWorkflowInput = "123456" + failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput) + val failurePathResponse = failurePathWorkflowDeferredOutput.await() + assertNotNull(failurePathResponse, "failed to get response") + } + } + + @Test + fun testMultipleSkipFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, C>D/SUCCESS, D>E/SUCCESS, B>E/SUCCESS, E>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null) + val deferredOutput = CompletableDeferred<String>() + val input = "123456" + simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + @Test + fun testParallelFlow() { + runBlocking { + val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/SUCCESS, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]" + .toGraph() + val simpleWorkflow = TestBluePrintWorkFlowService() + simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null) + val deferredOutput = CompletableDeferred<String>() + val input = "123456" + simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput) + val response = deferredOutput.await() + assertNotNull(response, "failed to get response") + } + } + + private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> { + val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>() + every { bluePrintRuntimeService.id() } returns "123456" + return bluePrintRuntimeService + } + + private fun prepareSimulation(successes: List<String>?, failures: List<String>?): MutableMap<String, EdgeLabel> { + val simulatedState: MutableMap<String, EdgeLabel> = hashMapOf() + successes?.forEach { + simulatedState[it] = EdgeLabel.SUCCESS + } + failures?.forEach { + simulatedState[it] = EdgeLabel.FAILURE + } + return simulatedState + } +} + +class TestBluePrintWorkFlowService + : AbstractBluePrintWorkFlowService<String, String>() { + + lateinit var simulatedState: MutableMap<String, EdgeLabel> + + override suspend fun initializeWorkflow(input: String): EdgeLabel { + return EdgeLabel.SUCCESS + } + + override suspend fun prepareNodeExecutionMessage(node: Graph.Node) + : NodeExecuteMessage<String, String> { + return NodeExecuteMessage(node, "$node Input", "") + } + + override suspend fun executeNode(node: Graph.Node, nodeInput: String, + nodeOutput: String): EdgeLabel { +// val random = (1..10).random() * 1000 +// println("will reply in $random ms") +// kotlinx.coroutines.delay(random.toLong()) + val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") + return status + } + + override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { + val nodeOutput = "" + val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) + return nodeSkipMessage + } + + override suspend fun skipNode(node: Graph.Node, nodeInput: String, + nodeOutput: String): EdgeLabel { + val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") + return status + } + + override suspend fun cancelNode(node: Graph.Node, nodeInput: String, + nodeOutput: String): EdgeLabel { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun restartNode(node: Graph.Node, nodeInput: String, + nodeOutput: String): EdgeLabel { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { + return "Final Response" + } +}
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtilsTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtilsTest.kt new file mode 100644 index 000000000..fb0a1a63d --- /dev/null +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/WorkflowGraphUtilsTest.kt @@ -0,0 +1,42 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.core.utils + +import org.onap.ccsdk.cds.controllerblueprints.core.dsl.workflow +import kotlin.test.Test +import kotlin.test.assertNotNull + +class WorkflowGraphUtilsTest { + + @Test + fun testWorkFlowToGraph() { + + val workflow = workflow("sample", "") { + step("A", "A", "") { + success("B") + } + step("B", "B", "") { + success("C") + failure("D") + } + step("C", "C", "") + step("D", "D", "") + } + val graph = WorkflowGraphUtils.workFlowToGraph(workflow) + assertNotNull(graph, "failed to create graph") + } +}
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/service/src/main/java/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.java b/ms/controllerblueprints/modules/service/src/main/java/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.java deleted file mode 100644 index b9eff7624..000000000 --- a/ms/controllerblueprints/modules/service/src/main/java/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright © 2017-2018 AT&T Intellectual Property. - * Modifications Copyright © 2018 IBM. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.controllerblueprints.service; - -import com.google.common.base.Preconditions; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException; -import org.onap.ccsdk.cds.controllerblueprints.core.data.PropertyDefinition; -import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment; -import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceDefinition; -import org.onap.ccsdk.cds.controllerblueprints.resource.dict.utils.ResourceDictionaryUtils; -import org.onap.ccsdk.cds.controllerblueprints.service.domain.ResourceDictionary; -import org.onap.ccsdk.cds.controllerblueprints.service.model.AutoMapResponse; -import org.onap.ccsdk.cds.controllerblueprints.service.repository.ResourceDictionaryRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * AutoResourceMappingService.java Purpose: Provide Automapping of Resource Assignments AutoResourceMappingService - * - * @author Brinda Santh - * @version 1.0 - */ - -@Service -@SuppressWarnings("unused") -public class AutoResourceMappingService { - - private static Logger log = LoggerFactory.getLogger(AutoResourceMappingService.class); - - private ResourceDictionaryRepository dataDictionaryRepository; - - /** - * This is a AutoResourceMappingService constructor - * - * @param dataDictionaryRepository dataDictionaryRepository - */ - public AutoResourceMappingService(ResourceDictionaryRepository dataDictionaryRepository) { - this.dataDictionaryRepository = dataDictionaryRepository; - } - - /** - * This is a autoMap service to map the template keys automatically to Dictionary fields. - * - * @param resourceAssignments resourceAssignments - * @return AutoMapResponse - */ - public AutoMapResponse autoMap(List<ResourceAssignment> resourceAssignments) throws BluePrintException { - AutoMapResponse autoMapResponse = new AutoMapResponse(); - try { - if (CollectionUtils.isNotEmpty(resourceAssignments)) { - - // Create the Dictionary definitions for the ResourceAssignment Names - Map<String, ResourceDictionary> dictionaryMap = getDictionaryDefinitions(resourceAssignments); - - for (ResourceAssignment resourceAssignment : resourceAssignments) { - if (resourceAssignment != null && StringUtils.isNotBlank(resourceAssignment.getName()) - && StringUtils.isBlank(resourceAssignment.getDictionaryName())) { - - populateDictionaryMapping(dictionaryMap, resourceAssignment); - - log.info("Mapped Resource : {}", resourceAssignment); - - } - } - } - List<ResourceDictionary> dictionaries = getDictionaryDefinitionsList(resourceAssignments); - List<ResourceAssignment> resourceAssignmentsFinal = getAllAutomapResourceAssignments(resourceAssignments); - autoMapResponse.setDataDictionaries(dictionaries); - autoMapResponse.setResourceAssignments(resourceAssignmentsFinal); - } catch (Exception e) { - log.error(String.format("Failed in auto process %s", e.getMessage())); - throw new BluePrintException(e.getMessage(), e); - } - return autoMapResponse; - } - - private void populateDictionaryMapping(Map<String, ResourceDictionary> dictionaryMap, ResourceAssignment resourceAssignment) { - ResourceDictionary dbDataDictionary = dictionaryMap.get(resourceAssignment.getName()); - if (dbDataDictionary != null && dbDataDictionary.getDefinition() != null) { - - ResourceDefinition dictionaryDefinition = dbDataDictionary.getDefinition(); - - if (dictionaryDefinition != null && StringUtils.isNotBlank(dictionaryDefinition.getName()) - && StringUtils.isBlank(resourceAssignment.getDictionaryName())) { - - resourceAssignment.setDictionaryName(dbDataDictionary.getName()); - ResourceDictionaryUtils.populateSourceMapping(resourceAssignment, dictionaryDefinition); - } - } - } - - private Map<String, ResourceDictionary> getDictionaryDefinitions(List<ResourceAssignment> resourceAssignments) { - Map<String, ResourceDictionary> dictionaryMap = new HashMap<>(); - List<String> names = new ArrayList<>(); - for (ResourceAssignment resourceAssignment : resourceAssignments) { - if (resourceAssignment != null && StringUtils.isNotBlank(resourceAssignment.getName())) { - names.add(resourceAssignment.getName()); - } - } - if (CollectionUtils.isNotEmpty(names)) { - - List<ResourceDictionary> dictionaries = dataDictionaryRepository.findByNameIn(names); - if (CollectionUtils.isNotEmpty(dictionaries)) { - for (ResourceDictionary dataDictionary : dictionaries) { - if (dataDictionary != null && StringUtils.isNotBlank(dataDictionary.getName())) { - dictionaryMap.put(dataDictionary.getName(), dataDictionary); - } - } - } - } - return dictionaryMap; - - } - - private List<ResourceDictionary> getDictionaryDefinitionsList(List<ResourceAssignment> resourceAssignments) { - List<ResourceDictionary> dictionaries = null; - List<String> names = new ArrayList<>(); - for (ResourceAssignment resourceAssignment : resourceAssignments) { - if (resourceAssignment != null && StringUtils.isNotBlank(resourceAssignment.getDictionaryName())) { - - if (!names.contains(resourceAssignment.getDictionaryName())) { - names.add(resourceAssignment.getDictionaryName()); - } - - if (resourceAssignment.getDependencies() != null && !resourceAssignment.getDependencies().isEmpty()) { - List<String> dependencyNames = resourceAssignment.getDependencies(); - for (String dependencyName : dependencyNames) { - if (StringUtils.isNotBlank(dependencyName) && !names.contains(dependencyName)) { - names.add(dependencyName); - } - } - } - } - } - if (CollectionUtils.isNotEmpty(names)) { - dictionaries = dataDictionaryRepository.findByNameIn(names); - } - return dictionaries; - - } - - private List<ResourceAssignment> getAllAutomapResourceAssignments(List<ResourceAssignment> resourceAssignments) { - List<ResourceDictionary> dictionaries = null; - List<String> names = new ArrayList<>(); - for (ResourceAssignment resourceAssignment : resourceAssignments) { - if (resourceAssignment != null && StringUtils.isNotBlank(resourceAssignment.getDictionaryName())) { - if (resourceAssignment.getDependencies() != null && !resourceAssignment.getDependencies().isEmpty()) { - List<String> dependencieNames = resourceAssignment.getDependencies(); - for (String dependencieName : dependencieNames) { - if (StringUtils.isNotBlank(dependencieName) && !names.contains(dependencieName) - && !checkAssignmentsExists(resourceAssignments, dependencieName)) { - names.add(dependencieName); - } - } - } - } - } - - if (!names.isEmpty()) { - dictionaries = dataDictionaryRepository.findByNameIn(names); - } - if (dictionaries != null) { - for (ResourceDictionary resourcedictionary : dictionaries) { - ResourceDefinition dictionaryDefinition = resourcedictionary.getDefinition(); - Preconditions.checkNotNull(dictionaryDefinition, "failed to get Resource Definition from dictionary definition"); - PropertyDefinition property = new PropertyDefinition(); - property.setRequired(true); - ResourceAssignment resourceAssignment = new ResourceAssignment(); - resourceAssignment.setName(resourcedictionary.getName()); - resourceAssignment.setDictionaryName(resourcedictionary - .getName()); - resourceAssignment.setVersion(0); - resourceAssignment.setProperty(property); - ResourceDictionaryUtils.populateSourceMapping(resourceAssignment, dictionaryDefinition); - resourceAssignments.add(resourceAssignment); - } - } - return resourceAssignments; - - } - - - private boolean checkAssignmentsExists(List<ResourceAssignment> resourceAssignmentsWithDepencies, String resourceName) { - return resourceAssignmentsWithDepencies.stream().anyMatch(names -> names.getName().equalsIgnoreCase(resourceName)); - } - -} diff --git a/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.kt b/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.kt new file mode 100644 index 000000000..3ab9fee58 --- /dev/null +++ b/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/AutoResourceMappingService.kt @@ -0,0 +1,174 @@ +/* + * Copyright © 2017-2018 AT&T Intellectual Property. + * Modifications Copyright © 2019 Huawei. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.controllerblueprints.service + +import com.google.common.base.Preconditions +import org.apache.commons.collections.CollectionUtils +import org.apache.commons.lang3.StringUtils +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.data.PropertyDefinition +import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment +import org.onap.ccsdk.cds.controllerblueprints.resource.dict.utils.ResourceDictionaryUtils +import org.onap.ccsdk.cds.controllerblueprints.service.domain.ResourceDictionary +import org.onap.ccsdk.cds.controllerblueprints.service.model.AutoMapResponse +import org.onap.ccsdk.cds.controllerblueprints.service.repository.ResourceDictionaryRepository +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service +import java.util.* + +@Service +open class AutoResourceMappingService(private val dataDictionaryRepository: ResourceDictionaryRepository) { + + private val log = LoggerFactory.getLogger(AutoResourceMappingService::class.java) + + @Throws(BluePrintException::class) + fun autoMap(resourceAssignments: MutableList<ResourceAssignment>): + AutoMapResponse { + val autoMapResponse = AutoMapResponse() + try { + if (CollectionUtils.isNotEmpty(resourceAssignments)) { + // Create the Dictionary definitions for the ResourceAssignment Names + val dictionaryMap = getDictionaryDefinitions(resourceAssignments) + + for (resourceAssignment in resourceAssignments) { + if (StringUtils.isNotBlank(resourceAssignment.name) + && StringUtils.isBlank(resourceAssignment.dictionaryName)) { + populateDictionaryMapping(dictionaryMap, resourceAssignment) + log.info("Mapped Resource : {}", resourceAssignment) + } + } + } + val dictionaries = getDictionaryDefinitionsList(resourceAssignments) + val resourceAssignmentsFinal = getAllAutoMapResourceAssignments(resourceAssignments) + autoMapResponse.dataDictionaries = dictionaries + autoMapResponse.resourceAssignments = resourceAssignmentsFinal + } catch (e: Exception) { + log.error(String.format("Failed in auto process %s", e.message)) + throw BluePrintException(e, e.message!!) + } + + return autoMapResponse + } + + private fun populateDictionaryMapping(dictionaryMap: Map<String, ResourceDictionary>, resourceAssignment: ResourceAssignment) { + val dbDataDictionary = dictionaryMap[resourceAssignment.name] + if (dbDataDictionary != null && dbDataDictionary.definition != null) { + + val dictionaryDefinition = dbDataDictionary.definition + + if (dictionaryDefinition != null && StringUtils.isNotBlank(dictionaryDefinition.name) + && StringUtils.isBlank(resourceAssignment.dictionaryName)) { + + resourceAssignment.dictionaryName = dbDataDictionary.name + ResourceDictionaryUtils.populateSourceMapping(resourceAssignment, dictionaryDefinition) + } + } + } + + private fun getDictionaryDefinitions(resourceAssignments: List<ResourceAssignment>): Map<String, ResourceDictionary> { + val dictionaryMap = HashMap<String, ResourceDictionary>() + val names = ArrayList<String>() + for (resourceAssignment in resourceAssignments) { + if (StringUtils.isNotBlank(resourceAssignment.name)) { + names.add(resourceAssignment.name) + } + } + if (CollectionUtils.isNotEmpty(names)) { + + val dictionaries = dataDictionaryRepository.findByNameIn(names) + if (CollectionUtils.isNotEmpty(dictionaries)) { + for (dataDictionary in dictionaries) { + if (StringUtils.isNotBlank(dataDictionary.name)) { + dictionaryMap[dataDictionary.name] = dataDictionary + } + } + } + } + return dictionaryMap + + } + private fun getDictionaryDefinitionsList(resourceAssignments: List<ResourceAssignment>): List<ResourceDictionary>? { + var dictionaries: List<ResourceDictionary>? = null + val names = ArrayList<String>() + for (resourceAssignment in resourceAssignments) { + if (StringUtils.isNotBlank(resourceAssignment.dictionaryName)) { + + if (!names.contains(resourceAssignment.dictionaryName)) { + names.add(resourceAssignment.dictionaryName!!) + } + + if (resourceAssignment.dependencies != null && !resourceAssignment.dependencies!!.isEmpty()) { + val dependencyNames = resourceAssignment.dependencies + for (dependencyName in dependencyNames!!) { + if (StringUtils.isNotBlank(dependencyName) && !names.contains(dependencyName)) { + names.add(dependencyName) + } + } + } + } + } + if (CollectionUtils.isNotEmpty(names)) { + dictionaries = dataDictionaryRepository.findByNameIn(names) + } + return dictionaries + + } + + private fun getAllAutoMapResourceAssignments(resourceAssignments: MutableList<ResourceAssignment>): List<ResourceAssignment> { + var dictionaries: List<ResourceDictionary>? = null + val names = ArrayList<String>() + for (resourceAssignment in resourceAssignments) { + if (StringUtils.isNotBlank(resourceAssignment.dictionaryName)) { + if (resourceAssignment.dependencies != null && !resourceAssignment.dependencies!!.isEmpty()) { + val dependencyNames = resourceAssignment.dependencies + for (dependencyName in dependencyNames!!) { + if (StringUtils.isNotBlank(dependencyName) && !names.contains(dependencyName) + && !checkAssignmentsExists(resourceAssignments, dependencyName)) { + names.add(dependencyName) + } + } + } + } + } + + if (!names.isEmpty()) { + dictionaries = dataDictionaryRepository.findByNameIn(names) + } + if (dictionaries != null) { + for (rscDictionary in dictionaries) { + val dictionaryDefinition = rscDictionary.definition + Preconditions.checkNotNull(dictionaryDefinition, "failed to get Resource Definition from dictionary definition") + val property = PropertyDefinition() + property.required = true + val resourceAssignment = ResourceAssignment() + resourceAssignment.name = rscDictionary.name + resourceAssignment.dictionaryName = rscDictionary.name + resourceAssignment.version = 0 + resourceAssignment.property = property + ResourceDictionaryUtils.populateSourceMapping(resourceAssignment, dictionaryDefinition) + resourceAssignments.add(resourceAssignment) + } + } + return resourceAssignments + } + + + private fun checkAssignmentsExists(resourceAssignmentsWithDepencies: List<ResourceAssignment>, resourceName: String): Boolean { + return resourceAssignmentsWithDepencies.stream().anyMatch { names -> names.name.equals(resourceName, ignoreCase = true) } + } +}
\ No newline at end of file |