diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-08-15 12:43:41 -0400 |
---|---|---|
committer | Brinda Santh Muthuramalingam <brindasanth@in.ibm.com> | 2019-08-16 14:28:55 +0000 |
commit | ad9b4a41a7be5ed8c579a2e96bbb4d2da629c036 (patch) | |
tree | 52d4dc8ef4cafb8f3a81a9901b5e7cc6d2bcd903 /ms/blueprintsprocessor/modules/services/workflow-service/src/main | |
parent | 048aad79ece5b709e65155e2d0c8675b7c2c84a2 (diff) |
Modify workflow execution service options.
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c
Issue-ID: CCSDK-1619
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/services/workflow-service/src/main')
2 files changed, 75 insertions, 60 deletions
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt index fcf0558c7..cde919ce8 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt @@ -29,8 +29,9 @@ import org.springframework.stereotype.Service @Service("bluePrintWorkflowExecutionService") open class BluePrintWorkflowExecutionServiceImpl( - private val componentWorkflowExecutionService: ComponentWorkflowExecutionService, - private val dgWorkflowExecutionService: DGWorkflowExecutionService + private val componentWorkflowExecutionService: ComponentWorkflowExecutionService, + private val dgWorkflowExecutionService: DGWorkflowExecutionService, + private val imperativeWorkflowExecutionService: ImperativeWorkflowExecutionService ) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { private val log = LoggerFactory.getLogger(BluePrintWorkflowExecutionServiceImpl::class.java)!! @@ -51,28 +52,37 @@ open class BluePrintWorkflowExecutionServiceImpl( val input = executionServiceInput.payload.get("$workflowName-request") bluePrintRuntimeService.assignWorkflowInputs(workflowName, input) - // Get the DG Node Template - val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName) + val workflow = bluePrintContext.workflowByName(workflowName) - val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom + val steps = workflow.steps ?: throw BluePrintProcessorException("could't get steps for workflow($workflowName)") - log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)") - - val executionServiceOutput: ExecutionServiceOutput = when { - derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> { - componentWorkflowExecutionService - .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) - } - derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> { - dgWorkflowExecutionService + /** If workflow has multiple steps, then it is imperative workflow */ + val executionServiceOutput: ExecutionServiceOutput = if (steps.size > 1) { + imperativeWorkflowExecutionService .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) - } - else -> { - throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " + - "to node template($nodeTemplateName) derived from($derivedFrom)") + } else { + // Get the DG Node Template + val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName) + + val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom + + log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)") + /** Return ExecutionServiceOutput based on DG node or Component Node */ + when { + derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> { + componentWorkflowExecutionService + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) + } + derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> { + dgWorkflowExecutionService + .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties) + } + else -> { + throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " + + "to node template($nodeTemplateName) derived from($derivedFrom)") + } } } - executionServiceOutput.commonHeader = executionServiceInput.commonHeader executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers // Resolve Workflow Outputs diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt index e7e5fe68a..2a14be216 100644 --- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt @@ -19,12 +19,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow import kotlinx.coroutines.CompletableDeferred import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.asGraph +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.controllerblueprints.core.* import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService -import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.service.* import org.springframework.beans.factory.config.ConfigurableBeanFactory import org.springframework.context.annotation.Scope @@ -32,7 +31,7 @@ import org.springframework.stereotype.Service @Service("imperativeWorkflowExecutionService") class ImperativeWorkflowExecutionService( - private val bluePrintWorkFlowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>) + private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, @@ -46,7 +45,8 @@ class ImperativeWorkflowExecutionService( val graph = bluePrintContext.workflowByName(workflowName).asGraph() val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - bluePrintWorkFlowService.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput, deferredOutput) + imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService, + executionServiceInput, deferredOutput) return deferredOutput.await() } } @@ -59,6 +59,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*> lateinit var executionServiceInput: ExecutionServiceInput + lateinit var workflowName: String lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput> override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, @@ -67,77 +68,81 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS this.graph = graph this.bluePrintRuntimeService = bluePrintRuntimeService this.executionServiceInput = input + this.workflowName = this.executionServiceInput.actionIdentifiers.actionName this.deferredExecutionServiceOutput = output this.workflowId = bluePrintRuntimeService.id() val startMessage = WorkflowExecuteMessage(input, output) - workflowActor.send(startMessage) + workflowActor().send(startMessage) } override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel { return EdgeLabel.SUCCESS } - override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput { + override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput { + val wfStatus = if (exception != null) { + val status = Status() + status.message = BluePrintConstants.STATUS_FAILURE + status.errorMessage = exception.message + status + } else { + val status = Status() + status.message = BluePrintConstants.STATUS_SUCCESS + status + } return ExecutionServiceOutput().apply { commonHeader = executionServiceInput.commonHeader actionIdentifiers = executionServiceInput.actionIdentifiers + status = wfStatus } } override suspend fun prepareNodeExecutionMessage(node: Graph.Node) : NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> { - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - return NodeExecuteMessage(node, executionServiceInput, deferredOutput) + val nodeOutput = ExecutionServiceOutput().apply { + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + } + return NodeExecuteMessage(node, executionServiceInput, nodeOutput) } override suspend fun prepareNodeSkipMessage(node: Graph.Node) : NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> { - val deferredOutput = CompletableDeferred<ExecutionServiceOutput>() - return NodeSkipMessage(node, executionServiceInput, deferredOutput) + val nodeOutput = ExecutionServiceOutput().apply { + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + } + return NodeSkipMessage(node, executionServiceInput, nodeOutput) } override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { - try { - val nodeTemplateName = node.id - /** execute node template */ - val executionServiceOutput = nodeTemplateExecutionService - .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput) - val edgeStatus = when (executionServiceOutput.status.message) { - BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE - else -> EdgeLabel.SUCCESS - } - /** set deferred output and status */ - deferredNodeOutput.complete(executionServiceOutput) - deferredNodeStatus.complete(edgeStatus) - } catch (e: Exception) { - log.error("failed in executeNode($node)", e) - deferredNodeOutput.completeExceptionally(e) - deferredNodeStatus.complete(EdgeLabel.FAILURE) + nodeOutput: ExecutionServiceOutput): EdgeLabel { + log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})") + val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id) + checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" } + val nodeTemplateName = step.target!! + /** execute node template */ + val executionServiceOutput = nodeTemplateExecutionService + .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput) + + return when (executionServiceOutput.status.message) { + BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE + else -> EdgeLabel.SUCCESS } } override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { - val executionServiceOutput = ExecutionServiceOutput().apply { - commonHeader = nodeInput.commonHeader - actionIdentifiers = nodeInput.actionIdentifiers - } - deferredNodeOutput.complete(executionServiceOutput) - deferredNodeStatus.complete(EdgeLabel.SUCCESS) + nodeOutput: ExecutionServiceOutput): EdgeLabel { + return EdgeLabel.SUCCESS } override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: ExecutionServiceOutput): EdgeLabel { TODO("not implemented") } override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput, - deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>, - deferredNodeStatus: CompletableDeferred<EdgeLabel>) { + nodeOutput: ExecutionServiceOutput): EdgeLabel { TODO("not implemented") } }
\ No newline at end of file |