diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-08-15 12:43:41 -0400 |
---|---|---|
committer | Brinda Santh Muthuramalingam <brindasanth@in.ibm.com> | 2019-08-16 14:28:55 +0000 |
commit | ad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 (patch) | |
tree | 52d4dc8ef4cafb8f3a81a9901b5e7cc6d2bcd903 /ms | |
parent | 048aad79ece5b709e65155e2d0c8675b7c2c84a2 (diff) |
Modify workflow execution service options.
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c
Issue-ID: CCSDK-1619
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms')
8 files changed, 206 insertions, 163 deletions
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt index fcf0558c7..cde919ce8 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt @@ -29,8 +29,9 @@ import org.springframework.stereotype.Service @Service("bluePrintWorkflowExecutionService") open class BluePrintWorkflowExecutionServiceImpl( - private val componentWorkflowExecutionService: ComponentWorkflowExecutionService, - private val dgWorkflowExecutionService: DGWorkflowExecutionService + private val componentWorkflowExecutionService: ComponentWorkflowExecutionService, + private val dgWorkflowExecutionService: DGWorkflowExecutionService, + private val imperativeWorkflowExecutionService: ImperativeWorkflowExecutionService ) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { private val log = LoggerFactory.getLogger(BluePrintWorkflowExecutionServiceImpl::class.java)!! @@ -51,28 +52,37 @@ open class BluePrintWorkflowExecutionServiceImpl( val input = executionServiceInput.payload.get("$workflowName-request") bluePrintRuntimeService.assignWorkflowInputs(workflowName, input) - // Get the DG Node Template - val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName) + val workflow = bluePrintContext.workflowByName(workflowName) - val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom + val steps = workflow.steps ?: throw BluePrintProcessorException("could't get steps for workflow($workflowName)") - log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)") - - val executionServiceOutput: ExecutionServiceOutput = when { - derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> { - componentWorkflowExecutionService - .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) - } - derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> { - dgWorkflowExecutionService + /** If workflow has multiple steps, then it is imperative workflow */ + val executionServiceOutput: ExecutionServiceOutput = if (steps.size > 1) { + imperativeWorkflowExecutionService .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) - } - else -> { - throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " + - "to node template($nodeTemplateName) derived from($derivedFrom)") + } else { + // Get the DG Node Template + val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName) + + val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom + + log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)") + /** Return ExecutionServiceOutput based on DG node or Component Node */ + when { + derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> { + componentWorkflowExecutionService + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) + } + derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> { + dgWorkflowExecutionService + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) + } + else -> { + throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " + + "to node template($nodeTemplateName) derived from($derivedFrom)") + } } } - executionServiceOutput.commonHeader = executionServiceInput.commonHeader executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers // Resolve Workflow Outputs diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index e7e5fe68a..2a14be216 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -19,12 +19,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow import kotlinx.coroutines.CompletableDeferred import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.asGraph +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.controllerblueprints.core.* import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService -import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.* import org.springframework.beans.factory.config.ConfigurableBeanFactory import org.springframework.context.annotation.Scope @@ -32,7 +31,7 @@ import org.springframework.stereotype.Service @Service("imperativeWorkflowExecutionService") class ImperativeWorkflowExecutionService( - private val bluePrintWorkFlowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>) + private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, @@ -46,7 +45,8 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - bluePrintWorkFlowService.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput, deferredOutput) + imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput, deferredOutput) return deferredOutput.await() } } @@ -59,6 +59,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput + lateinit var workflowName: String lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput> override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, @@ -67,77 +68,81 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input + this.workflowName = this.executionServiceInput.actionIdentifiers.actionName this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor.send(startMessage) + workflowActor().send(startMessage) } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { + val wfStatus = if (exception != null) { + val status = Status() + status.message = BluePrintConstants.STATUS_FAILURE + status.errorMessage = exception.message + status + } else { + val status = Status() + status.message = BluePrintConstants.STATUS_SUCCESS + status + } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader actionIdentifiers = executionServiceInput.actionIdentifiers + status = wfStatus } } override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> { - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - return NodeExecuteMessage(node, executionServiceInput, deferredOutput) + val nodeOutput = ExecutionServiceOutput().apply { + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + } + return NodeExecuteMessage(node, executionServiceInput, nodeOutput) } override suspend fun prepareNodeSkipMessage(node: Graph.Node) : NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> { - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - return NodeSkipMessage(node, executionServiceInput, deferredOutput) + val nodeOutput = ExecutionServiceOutput().apply { + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + } + return NodeSkipMessage(node, executionServiceInput, nodeOutput) } override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { - try { - val nodeTemplateName = node.id - /** execute node template */ - val executionServiceOutput = nodeTemplateExecutionService - .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput) - val edgeStatus = when (executionServiceOutput.status.message) { - BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE - else -> EdgeLabel.SUCCESS - } - /** set deferred output and status */ - deferredNodeOutput.complete(executionServiceOutput) - deferredNodeStatus.complete(edgeStatus) - } catch (e: Exception) { - log.error("failed in executeNode($node)", e) - deferredNodeOutput.completeExceptionally(e) - deferredNodeStatus.complete(EdgeLabel.FAILURE) + nodeOutput: ExecutionServiceOutput): EdgeLabel { + log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})") + val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id) + checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" } + val nodeTemplateName = step.target!! + /** execute node template */ + val executionServiceOutput = nodeTemplateExecutionService + .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput) + + return when (executionServiceOutput.status.message) { + BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE + else -> EdgeLabel.SUCCESS } } override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { - val executionServiceOutput = ExecutionServiceOutput().apply { - commonHeader = nodeInput.commonHeader - actionIdentifiers = nodeInput.actionIdentifiers - } - deferredNodeOutput.complete(executionServiceOutput) - deferredNodeStatus.complete(EdgeLabel.SUCCESS) + nodeOutput: ExecutionServiceOutput): EdgeLabel { + return EdgeLabel.SUCCESS } override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: ExecutionServiceOutput): EdgeLabel { TODO("not implemented") } override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: ExecutionServiceOutput): EdgeLabel { TODO("not implemented") } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt index 3c740725e..436de1b56 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt @@ -16,12 +16,17 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow +import io.mockk.every +import io.mockk.mockkObject +import io.mockk.unmockkAll import kotlinx.coroutines.runBlocking +import org.junit.After import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService @@ -29,7 +34,6 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.ApplicationContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertEquals @@ -42,31 +46,52 @@ import kotlin.test.assertNotNull class BluePrintWorkflowExecutionServiceImplTest { @Autowired - lateinit var applicationContext: ApplicationContext - - @Autowired lateinit var bluePrintWorkflowExecutionService: BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> @Before fun init() { - BluePrintDependencyService.inject(applicationContext) + mockkObject(BluePrintDependencyService) + every { BluePrintDependencyService.applicationContext.getBean(any()) } returns MockComponentFunction() + } + + @After + fun afterTests() { + unmockkAll() } @Test fun testBluePrintWorkflowExecutionService() { runBlocking { val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234", - "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") + "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input.json", - ExecutionServiceInput::class.java)!! + ExecutionServiceInput::class.java)!! val executionServiceOutput = bluePrintWorkflowExecutionService - .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) assertNotNull(executionServiceOutput, "failed to get response") assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message, - "failed to get successful response") + "failed to get successful response") + } + } + + @Test + fun testImperativeBluePrintWorkflowExecutionService() { + runBlocking { + val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234", + "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") + + val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/imperative-test-input.json", + ExecutionServiceInput::class.java)!! + + val executionServiceOutput = bluePrintWorkflowExecutionService + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) + + assertNotNull(executionServiceOutput, "failed to get response") + assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message, + "failed to get successful response") } } @@ -75,13 +100,13 @@ class BluePrintWorkflowExecutionServiceImplTest { assertFailsWith(exceptionClass = BluePrintProcessorException::class) { runBlocking { val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234", - "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") + "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration") //service input will have a mislabeled input params, we are expecting to get an error when that happens with a useful error message val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input-missing-resource_assignment_request.json", - ExecutionServiceInput::class.java)!! + ExecutionServiceInput::class.java)!! val executionServiceOutput = bluePrintWorkflowExecutionService - .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf()) } } } diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt index 301fc34c0..becd22857 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt @@ -27,7 +27,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.nodeTypeCompone import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.mockNodeTemplateComponentScriptExecutor import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes +import org.onap.ccsdk.cds.controllerblueprints.core.data.ServiceTemplate import org.onap.ccsdk.cds.controllerblueprints.core.dsl.serviceTemplate +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -37,6 +39,7 @@ import kotlin.test.Test import kotlin.test.assertNotNull class ImperativeWorkflowExecutionServiceTest { + val log = logger(ImperativeWorkflowExecutionServiceTest::class) @Before fun init() { @@ -49,37 +52,40 @@ class ImperativeWorkflowExecutionServiceTest { unmockkAll() } - @Test - fun testImperativeExecutionService() { - runBlocking { - val serviceTemplate = serviceTemplate("imperative-test", "1.0.0", - "brindasanth@onap.com", "tosca") { + fun mockServiceTemplate(): ServiceTemplate { + return serviceTemplate("imperative-test", "1.0.0", + "brindasanth@onap.com", "tosca") { - topologyTemplate { - nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config", - "cba.wt.imperative.test.ResolveConfig")) - nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config", - "cba.wt.imperative.test.ActivateConfig")) - nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback", - "cba.wt.imperative.test.ActivateConfigRollback")) - nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence", - "cba.wt.imperative.test.ActivateLicence")) + topologyTemplate { + nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config", + "cba.wt.imperative.test.ResolveConfig")) + nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config", + "cba.wt.imperative.test.ActivateConfig")) + nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback", + "cba.wt.imperative.test.ActivateConfigRollback")) + nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence", + "cba.wt.imperative.test.ActivateLicence")) - workflow("test-wf", "Test Imperative flow") { - step("resolve-config", "resolve-config", "") { - success("activate-config") - } - step("activate-config", "activate-config", "") { - success("activate-licence") - failure("activate-config-rollback") - } - step("activate-config-rollback", "activate-config-rollback", "") - step("activate-licence", "activate-licence", "") + workflow("imperative-test-wf", "Test Imperative flow") { + step("resolve-config", "resolve-config", "") { + success("activate-config") } + step("activate-config", "activate-config", "") { + success("activate-licence") + failure("activate-config-rollback") + } + step("activate-config-rollback", "activate-config-rollback", "") + step("activate-licence", "activate-licence", "") } - nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor()) } + nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor()) + } + } + @Test + fun testImperativeExecutionService() { + runBlocking { + val serviceTemplate = mockServiceTemplate() val bluePrintContext = BluePrintContext(serviceTemplate) bluePrintContext.rootPath = normalizedPathName(".") bluePrintContext.entryDefinition = "cba.imperative.test.ImperativeTestDefinitions.kt" diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json index 188e84083..d3495c456 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json @@ -7,11 +7,11 @@ "actionIdentifiers": { "blueprintName": "imperative-test", "blueprintVersion": "1.0.0", - "actionName": "test-wf", + "actionName": "imperative-test-wf", "mode": "sync" }, "payload": { - "test-wf-request": { + "imperative-test-wf-request": { "hostname": "localhost" } } diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt index 06d3421c0..259efbf0b 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt @@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String, private val author: String, private val tags: String) { private var serviceTemplate = ServiceTemplate() - private lateinit var topologyTemplate: TopologyTemplate + private var topologyTemplate: TopologyTemplate? = null private var metadata: MutableMap<String, String> = hashMapOf() private var dslDefinitions: MutableMap<String, JsonNode>? = null private var imports: MutableList<ImportDefinition> = mutableListOf() diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt index 91c2bcbbb..905150213 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt @@ -36,24 +36,20 @@ interface BluePrintWorkFlowService<In, Out> { suspend fun initializeWorkflow(input: In): EdgeLabel - suspend fun prepareWorkflowOutput(): Out + suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out /** Prepare the message for the Node */ suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out> suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out> - suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel - suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) + suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel } @@ -71,17 +67,13 @@ sealed class NodeMessage<In, Out> class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>() -class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() -class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, - val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>() +class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>() enum class EdgeAction(val id: String) { EXECUTE("execute"), @@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP fun cancel() { log.info("Received workflow($workflowId) cancel request") job.cancel() - throw CancellationException("Workflow($workflowId) cancelled as requested ...") + throw CancellationException("Workflow($workflowId) cancelled as requested") } - val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { - - /** Send message from workflow actor to node actor */ - fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { - nodeActor.send(nodeMessage) - } - + fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Process the workflow execution message */ suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) { + + val nodeActor = nodeActor() // Prepare Workflow and Populate the Initial store initializeWorkflow(workflowExecuteMessage.input) @@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP // Prepare first node message and Send NodeExecuteMessage // Start node doesn't wait for any nodes, so we can pass Execute message directly val nodeExecuteMessage = prepareNodeExecutionMessage(startNode) - sendNodeMessage(nodeExecuteMessage) - log.debug("First node triggered successfully, waiting for response") - + /** Send message from workflow actor to node actor */ + launch { + nodeActor.send(nodeExecuteMessage) + } // Wait for workflow completion or Error nodeActor.invokeOnClose { exception -> launch { - log.debug("End Node Completed, processing completion message") - val workflowOutput = prepareWorkflowOutput() + log.info("End Node Completed, processing completion message") + val bluePrintProcessorException: BluePrintProcessorException? = + if (exception != null) BluePrintProcessorException(exception) else null + + val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException) workflowExecuteMessage.output.complete(workflowOutput) channel.close(exception) } @@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } - private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { + private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) { /** Send message to process from one state to other state */ fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch { @@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } } - fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch { + suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) { val node = message.node node.status = NodeStatus.EXECUTING val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME @@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } else { log.debug("##### Processing workflow($workflowId) node($node) #####") // Call the Extension function and get the next Edge state. - val deferredNodeState = CompletableDeferred<EdgeLabel>() - executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - deferredNodeState.await() + executeNode(node, message.nodeInput, message.nodeOutput) } // Update Node Completed node.status = NodeStatus.EXECUTED @@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP } } - fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch { + suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) { val node = message.node val incomingEdges = graph.incomingEdges(node.id) // Check All Incoming Nodes Skipped @@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP if (nonSkippedEdges.isEmpty()) { log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$") // Call the Extension Function - val deferredNodeState = CompletableDeferred<EdgeLabel>() - skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState) - val nodeState = deferredNodeState.await() + val nodeState = skipNode(node, message.nodeInput, message.nodeOutput) log.info("Skip Node($node) -> Executed State($nodeState)") // Mark the Current node as Skipped node.status = NodeStatus.SKIPPED @@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP when (nodeMessage) { is NodeReadyMessage<In, Out> -> { // Blocking call - readyNodeWorker(nodeMessage) + try { + readyNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } is NodeExecuteMessage<In, Out> -> { launch { - executeNodeWorker(nodeMessage) + try { + executeNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeSkipMessage<In, Out> -> { launch { - skipNodeWorker(nodeMessage) + try { + skipNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } is NodeRestartMessage<In, Out> -> { launch { - restartNodeWorker(nodeMessage) + try { + restartNodeWorker(nodeMessage) + } catch (e: Exception) { + channel.close(e) + } } } } @@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP this.graph = graph this.workflowId = bluePrintRuntimeService.id() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor.send(startMessage) + workflowActor().send(startMessage) } }
\ No newline at end of file diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt index 62cb10851..b8d8cea3e 100644 --- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt +++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.runBlocking import org.junit.Test import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph import org.onap.ccsdk.cds.controllerblueprints.core.toGraph @@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<String, String> { - val deferredNodeOutput = CompletableDeferred<String>() - val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput) - return nodeExecuteMessage + return NodeExecuteMessage(node, "$node Input", "") } override suspend fun executeNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { // val random = (1..10).random() * 1000 // println("will reply in $random ms") // kotlinx.coroutines.delay(random.toLong()) val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) - deferredNodeOutput.complete("$node, Output: $nodeInput output") + return status } - override suspend fun prepareNodeSkipMessage(node: Graph.Node) - : NodeSkipMessage<String, String> { - val deferredNodeOutput = CompletableDeferred<String>() - val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput) + override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> { + val nodeOutput = "" + val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput) return nodeSkipMessage } override suspend fun skipNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)") - deferredNodeStatus.complete(status) + return status } override suspend fun cancelNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override suspend fun restartNode(node: Graph.Node, nodeInput: String, - deferredNodeOutput: CompletableDeferred<String>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: String): EdgeLabel { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } - override suspend fun prepareWorkflowOutput(): String { + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String { return "Final Response" } }
\ No newline at end of file |