summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrinda Santh <brindasanth@in.ibm.com>2019-08-15 12:43:41 -0400
committerBrinda Santh Muthuramalingam <brindasanth@in.ibm.com>2019-08-16 14:28:55 +0000
commitad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 (patch)
tree52d4dc8ef4cafb8f3a81a9901b5e7cc6d2bcd903
parent048aad79ece5b709e65155e2d0c8675b7c2c84a2 (diff)
Modify workflow execution service options.
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c Issue-ID: CCSDK-1619 Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
-rw-r--r--components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json37
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt48
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt87
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt49
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt56
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json4
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt2
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt92
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt31
9 files changed, 243 insertions, 163 deletions
diff --git a/components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json b/components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json
index 112014b88..70c1bc3c6 100644
--- a/components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json
+++ b/components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json
@@ -462,6 +462,43 @@
]
}
}
+ },
+ "imperative-test-wf": {
+ "steps": {
+ "activate-step1": {
+ "description": "Activate CLI flow 1",
+ "target": "activate-cli",
+ "activities": [
+ {
+ "call_operation": "ComponentScriptExecutor.process"
+ }
+ ],
+ "on_success": [
+ "activate-step2"
+ ]
+ },
+ "activate-step2": {
+ "description": "Activate CLI flow 2",
+ "target": "activate-cli",
+ "activities": [
+ {
+ "call_operation": "ComponentScriptExecutor.process"
+ }
+ ],
+ "on_success": [
+ "activate-step3"
+ ]
+ },
+ "activate-step3": {
+ "description": "Activate CLI flow 3",
+ "target": "activate-cli",
+ "activities": [
+ {
+ "call_operation": "ComponentScriptExecutor.process"
+ }
+ ]
+ }
+ }
}
}
}
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt
index fcf0558c7..cde919ce8 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt
@@ -29,8 +29,9 @@ import org.springframework.stereotype.Service
@Service("bluePrintWorkflowExecutionService")
open class BluePrintWorkflowExecutionServiceImpl(
- private val componentWorkflowExecutionService: ComponentWorkflowExecutionService,
- private val dgWorkflowExecutionService: DGWorkflowExecutionService
+ private val componentWorkflowExecutionService: ComponentWorkflowExecutionService,
+ private val dgWorkflowExecutionService: DGWorkflowExecutionService,
+ private val imperativeWorkflowExecutionService: ImperativeWorkflowExecutionService
) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
private val log = LoggerFactory.getLogger(BluePrintWorkflowExecutionServiceImpl::class.java)!!
@@ -51,28 +52,37 @@ open class BluePrintWorkflowExecutionServiceImpl(
val input = executionServiceInput.payload.get("$workflowName-request")
bluePrintRuntimeService.assignWorkflowInputs(workflowName, input)
- // Get the DG Node Template
- val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName)
+ val workflow = bluePrintContext.workflowByName(workflowName)
- val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom
+ val steps = workflow.steps ?: throw BluePrintProcessorException("could't get steps for workflow($workflowName)")
- log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)")
-
- val executionServiceOutput: ExecutionServiceOutput = when {
- derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> {
- componentWorkflowExecutionService
- .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
- }
- derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> {
- dgWorkflowExecutionService
+ /** If workflow has multiple steps, then it is imperative workflow */
+ val executionServiceOutput: ExecutionServiceOutput = if (steps.size > 1) {
+ imperativeWorkflowExecutionService
.executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
- }
- else -> {
- throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " +
- "to node template($nodeTemplateName) derived from($derivedFrom)")
+ } else {
+ // Get the DG Node Template
+ val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName)
+
+ val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom
+
+ log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)")
+ /** Return ExecutionServiceOutput based on DG node or Component Node */
+ when {
+ derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> {
+ componentWorkflowExecutionService
+ .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
+ }
+ derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> {
+ dgWorkflowExecutionService
+ .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " +
+ "to node template($nodeTemplateName) derived from($derivedFrom)")
+ }
}
}
-
executionServiceOutput.commonHeader = executionServiceInput.commonHeader
executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
// Resolve Workflow Outputs
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
index e7e5fe68a..2a14be216 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
@@ -19,12 +19,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
import kotlinx.coroutines.CompletableDeferred
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.*
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.*
import org.springframework.beans.factory.config.ConfigurableBeanFactory
import org.springframework.context.annotation.Scope
@@ -32,7 +31,7 @@ import org.springframework.stereotype.Service
@Service("imperativeWorkflowExecutionService")
class ImperativeWorkflowExecutionService(
- private val bluePrintWorkFlowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
+ private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
: BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
@@ -46,7 +45,8 @@ class ImperativeWorkflowExecutionService(
val graph = bluePrintContext.workflowByName(workflowName).asGraph()
val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
- bluePrintWorkFlowService.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput, deferredOutput)
+ imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
+ executionServiceInput, deferredOutput)
return deferredOutput.await()
}
}
@@ -59,6 +59,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
lateinit var executionServiceInput: ExecutionServiceInput
+ lateinit var workflowName: String
lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput>
override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
@@ -67,77 +68,81 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
this.graph = graph
this.bluePrintRuntimeService = bluePrintRuntimeService
this.executionServiceInput = input
+ this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
this.deferredExecutionServiceOutput = output
this.workflowId = bluePrintRuntimeService.id()
val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor.send(startMessage)
+ workflowActor().send(startMessage)
}
override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
return EdgeLabel.SUCCESS
}
- override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
+ override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput {
+ val wfStatus = if (exception != null) {
+ val status = Status()
+ status.message = BluePrintConstants.STATUS_FAILURE
+ status.errorMessage = exception.message
+ status
+ } else {
+ val status = Status()
+ status.message = BluePrintConstants.STATUS_SUCCESS
+ status
+ }
return ExecutionServiceOutput().apply {
commonHeader = executionServiceInput.commonHeader
actionIdentifiers = executionServiceInput.actionIdentifiers
+ status = wfStatus
}
}
override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
: NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
- return NodeExecuteMessage(node, executionServiceInput, deferredOutput)
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
}
override suspend fun prepareNodeSkipMessage(node: Graph.Node)
: NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
- return NodeSkipMessage(node, executionServiceInput, deferredOutput)
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeSkipMessage(node, executionServiceInput, nodeOutput)
}
override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
- try {
- val nodeTemplateName = node.id
- /** execute node template */
- val executionServiceOutput = nodeTemplateExecutionService
- .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
- val edgeStatus = when (executionServiceOutput.status.message) {
- BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
- else -> EdgeLabel.SUCCESS
- }
- /** set deferred output and status */
- deferredNodeOutput.complete(executionServiceOutput)
- deferredNodeStatus.complete(edgeStatus)
- } catch (e: Exception) {
- log.error("failed in executeNode($node)", e)
- deferredNodeOutput.completeExceptionally(e)
- deferredNodeStatus.complete(EdgeLabel.FAILURE)
+ nodeOutput: ExecutionServiceOutput): EdgeLabel {
+ log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})")
+ val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
+ checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
+ val nodeTemplateName = step.target!!
+ /** execute node template */
+ val executionServiceOutput = nodeTemplateExecutionService
+ .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+
+ return when (executionServiceOutput.status.message) {
+ BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
+ else -> EdgeLabel.SUCCESS
}
}
override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
- val executionServiceOutput = ExecutionServiceOutput().apply {
- commonHeader = nodeInput.commonHeader
- actionIdentifiers = nodeInput.actionIdentifiers
- }
- deferredNodeOutput.complete(executionServiceOutput)
- deferredNodeStatus.complete(EdgeLabel.SUCCESS)
+ nodeOutput: ExecutionServiceOutput): EdgeLabel {
+ return EdgeLabel.SUCCESS
}
override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: ExecutionServiceOutput): EdgeLabel {
TODO("not implemented")
}
override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: ExecutionServiceOutput): EdgeLabel {
TODO("not implemented")
}
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt
index 3c740725e..436de1b56 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt
@@ -16,12 +16,17 @@
package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
+import io.mockk.every
+import io.mockk.mockkObject
+import io.mockk.unmockkAll
import kotlinx.coroutines.runBlocking
+import org.junit.After
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
@@ -29,7 +34,6 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.context.ApplicationContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertEquals
@@ -42,31 +46,52 @@ import kotlin.test.assertNotNull
class BluePrintWorkflowExecutionServiceImplTest {
@Autowired
- lateinit var applicationContext: ApplicationContext
-
- @Autowired
lateinit var bluePrintWorkflowExecutionService: BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
@Before
fun init() {
- BluePrintDependencyService.inject(applicationContext)
+ mockkObject(BluePrintDependencyService)
+ every { BluePrintDependencyService.applicationContext.getBean(any()) } returns MockComponentFunction()
+ }
+
+ @After
+ fun afterTests() {
+ unmockkAll()
}
@Test
fun testBluePrintWorkflowExecutionService() {
runBlocking {
val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
- "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+ "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input.json",
- ExecutionServiceInput::class.java)!!
+ ExecutionServiceInput::class.java)!!
val executionServiceOutput = bluePrintWorkflowExecutionService
- .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+ .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
assertNotNull(executionServiceOutput, "failed to get response")
assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message,
- "failed to get successful response")
+ "failed to get successful response")
+ }
+ }
+
+ @Test
+ fun testImperativeBluePrintWorkflowExecutionService() {
+ runBlocking {
+ val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
+ "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+
+ val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/imperative-test-input.json",
+ ExecutionServiceInput::class.java)!!
+
+ val executionServiceOutput = bluePrintWorkflowExecutionService
+ .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+
+ assertNotNull(executionServiceOutput, "failed to get response")
+ assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message,
+ "failed to get successful response")
}
}
@@ -75,13 +100,13 @@ class BluePrintWorkflowExecutionServiceImplTest {
assertFailsWith(exceptionClass = BluePrintProcessorException::class) {
runBlocking {
val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
- "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+ "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
//service input will have a mislabeled input params, we are expecting to get an error when that happens with a useful error message
val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input-missing-resource_assignment_request.json",
- ExecutionServiceInput::class.java)!!
+ ExecutionServiceInput::class.java)!!
val executionServiceOutput = bluePrintWorkflowExecutionService
- .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+ .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
}
}
}
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
index 301fc34c0..becd22857 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
@@ -27,7 +27,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.nodeTypeCompone
import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction
import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.mockNodeTemplateComponentScriptExecutor
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
+import org.onap.ccsdk.cds.controllerblueprints.core.data.ServiceTemplate
import org.onap.ccsdk.cds.controllerblueprints.core.dsl.serviceTemplate
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -37,6 +39,7 @@ import kotlin.test.Test
import kotlin.test.assertNotNull
class ImperativeWorkflowExecutionServiceTest {
+ val log = logger(ImperativeWorkflowExecutionServiceTest::class)
@Before
fun init() {
@@ -49,37 +52,40 @@ class ImperativeWorkflowExecutionServiceTest {
unmockkAll()
}
- @Test
- fun testImperativeExecutionService() {
- runBlocking {
- val serviceTemplate = serviceTemplate("imperative-test", "1.0.0",
- "brindasanth@onap.com", "tosca") {
+ fun mockServiceTemplate(): ServiceTemplate {
+ return serviceTemplate("imperative-test", "1.0.0",
+ "brindasanth@onap.com", "tosca") {
- topologyTemplate {
- nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config",
- "cba.wt.imperative.test.ResolveConfig"))
- nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config",
- "cba.wt.imperative.test.ActivateConfig"))
- nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback",
- "cba.wt.imperative.test.ActivateConfigRollback"))
- nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence",
- "cba.wt.imperative.test.ActivateLicence"))
+ topologyTemplate {
+ nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config",
+ "cba.wt.imperative.test.ResolveConfig"))
+ nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config",
+ "cba.wt.imperative.test.ActivateConfig"))
+ nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback",
+ "cba.wt.imperative.test.ActivateConfigRollback"))
+ nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence",
+ "cba.wt.imperative.test.ActivateLicence"))
- workflow("test-wf", "Test Imperative flow") {
- step("resolve-config", "resolve-config", "") {
- success("activate-config")
- }
- step("activate-config", "activate-config", "") {
- success("activate-licence")
- failure("activate-config-rollback")
- }
- step("activate-config-rollback", "activate-config-rollback", "")
- step("activate-licence", "activate-licence", "")
+ workflow("imperative-test-wf", "Test Imperative flow") {
+ step("resolve-config", "resolve-config", "") {
+ success("activate-config")
}
+ step("activate-config", "activate-config", "") {
+ success("activate-licence")
+ failure("activate-config-rollback")
+ }
+ step("activate-config-rollback", "activate-config-rollback", "")
+ step("activate-licence", "activate-licence", "")
}
- nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor())
}
+ nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor())
+ }
+ }
+ @Test
+ fun testImperativeExecutionService() {
+ runBlocking {
+ val serviceTemplate = mockServiceTemplate()
val bluePrintContext = BluePrintContext(serviceTemplate)
bluePrintContext.rootPath = normalizedPathName(".")
bluePrintContext.entryDefinition = "cba.imperative.test.ImperativeTestDefinitions.kt"
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json
index 188e84083..d3495c456 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json
@@ -7,11 +7,11 @@
"actionIdentifiers": {
"blueprintName": "imperative-test",
"blueprintVersion": "1.0.0",
- "actionName": "test-wf",
+ "actionName": "imperative-test-wf",
"mode": "sync"
},
"payload": {
- "test-wf-request": {
+ "imperative-test-wf-request": {
"hostname": "localhost"
}
}
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
index 06d3421c0..259efbf0b 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
@@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String,
private val author: String,
private val tags: String) {
private var serviceTemplate = ServiceTemplate()
- private lateinit var topologyTemplate: TopologyTemplate
+ private var topologyTemplate: TopologyTemplate? = null
private var metadata: MutableMap<String, String> = hashMapOf()
private var dslDefinitions: MutableMap<String, JsonNode>? = null
private var imports: MutableList<ImportDefinition> = mutableListOf()
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
index 91c2bcbbb..905150213 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
@@ -36,24 +36,20 @@ interface BluePrintWorkFlowService<In, Out> {
suspend fun initializeWorkflow(input: In): EdgeLabel
- suspend fun prepareWorkflowOutput(): Out
+ suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
/** Prepare the message for the Node */
suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
- suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
- suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+ suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
}
@@ -71,17 +67,13 @@ sealed class NodeMessage<In, Out>
class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
-class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
-class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
- val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
enum class EdgeAction(val id: String) {
EXECUTE("execute"),
@@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
fun cancel() {
log.info("Received workflow($workflowId) cancel request")
job.cancel()
- throw CancellationException("Workflow($workflowId) cancelled as requested ...")
+ throw CancellationException("Workflow($workflowId) cancelled as requested")
}
- val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
-
- /** Send message from workflow actor to node actor */
- fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
- nodeActor.send(nodeMessage)
- }
-
+ fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Process the workflow execution message */
suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+
+ val nodeActor = nodeActor()
// Prepare Workflow and Populate the Initial store
initializeWorkflow(workflowExecuteMessage.input)
@@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
// Prepare first node message and Send NodeExecuteMessage
// Start node doesn't wait for any nodes, so we can pass Execute message directly
val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
- sendNodeMessage(nodeExecuteMessage)
- log.debug("First node triggered successfully, waiting for response")
-
+ /** Send message from workflow actor to node actor */
+ launch {
+ nodeActor.send(nodeExecuteMessage)
+ }
// Wait for workflow completion or Error
nodeActor.invokeOnClose { exception ->
launch {
- log.debug("End Node Completed, processing completion message")
- val workflowOutput = prepareWorkflowOutput()
+ log.info("End Node Completed, processing completion message")
+ val bluePrintProcessorException: BluePrintProcessorException? =
+ if (exception != null) BluePrintProcessorException(exception) else null
+
+ val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
workflowExecuteMessage.output.complete(workflowOutput)
channel.close(exception)
}
@@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
- private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+ private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Send message to process from one state to other state */
fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
@@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
}
- fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
+ suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
val node = message.node
node.status = NodeStatus.EXECUTING
val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
@@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
} else {
log.debug("##### Processing workflow($workflowId) node($node) #####")
// Call the Extension function and get the next Edge state.
- val deferredNodeState = CompletableDeferred<EdgeLabel>()
- executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
- deferredNodeState.await()
+ executeNode(node, message.nodeInput, message.nodeOutput)
}
// Update Node Completed
node.status = NodeStatus.EXECUTED
@@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
}
- fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
+ suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
val node = message.node
val incomingEdges = graph.incomingEdges(node.id)
// Check All Incoming Nodes Skipped
@@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
if (nonSkippedEdges.isEmpty()) {
log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
// Call the Extension Function
- val deferredNodeState = CompletableDeferred<EdgeLabel>()
- skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
- val nodeState = deferredNodeState.await()
+ val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
log.info("Skip Node($node) -> Executed State($nodeState)")
// Mark the Current node as Skipped
node.status = NodeStatus.SKIPPED
@@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
when (nodeMessage) {
is NodeReadyMessage<In, Out> -> {
// Blocking call
- readyNodeWorker(nodeMessage)
+ try {
+ readyNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
is NodeExecuteMessage<In, Out> -> {
launch {
- executeNodeWorker(nodeMessage)
+ try {
+ executeNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
is NodeSkipMessage<In, Out> -> {
launch {
- skipNodeWorker(nodeMessage)
+ try {
+ skipNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
is NodeRestartMessage<In, Out> -> {
launch {
- restartNodeWorker(nodeMessage)
+ try {
+ restartNodeWorker(nodeMessage)
+ } catch (e: Exception) {
+ channel.close(e)
+ }
}
}
}
@@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
this.graph = graph
this.workflowId = bluePrintRuntimeService.id()
val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor.send(startMessage)
+ workflowActor().send(startMessage)
}
} \ No newline at end of file
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
index 62cb10851..b8d8cea3e 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.runBlocking
import org.junit.Test
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
@@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService
override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
: NodeExecuteMessage<String, String> {
- val deferredNodeOutput = CompletableDeferred<String>()
- val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput)
- return nodeExecuteMessage
+ return NodeExecuteMessage(node, "$node Input", "")
}
override suspend fun executeNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
// val random = (1..10).random() * 1000
// println("will reply in $random ms")
// kotlinx.coroutines.delay(random.toLong())
val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- deferredNodeStatus.complete(status)
- deferredNodeOutput.complete("$node, Output: $nodeInput output")
+ return status
}
- override suspend fun prepareNodeSkipMessage(node: Graph.Node)
- : NodeSkipMessage<String, String> {
- val deferredNodeOutput = CompletableDeferred<String>()
- val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput)
+ override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> {
+ val nodeOutput = ""
+ val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput)
return nodeSkipMessage
}
override suspend fun skipNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- deferredNodeStatus.complete(status)
+ return status
}
override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override suspend fun restartNode(node: Graph.Node, nodeInput: String,
- deferredNodeOutput: CompletableDeferred<String>,
- deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+ nodeOutput: String): EdgeLabel {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
- override suspend fun prepareWorkflowOutput(): String {
+ override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String {
return "Final Response"
}
} \ No newline at end of file