summaryrefslogtreecommitdiffstats
path: root/ms
diff options
context:
space:
mode:
Diffstat (limited to 'ms')
-rw-r--r--ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt113
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt28
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt9
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt42
-rw-r--r--ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt20
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt1
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt16
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt3
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt5
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt67
-rw-r--r--ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt130
-rw-r--r--ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExceptionHandler.kt (renamed from ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExeptionHandler.kt)16
12 files changed, 324 insertions, 126 deletions
diff --git a/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt b/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt
index 947a9630d..743aa714b 100644
--- a/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt
+++ b/ms/blueprintsprocessor/functions/ansible-awx-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/ansible/executor/ComponentRemoteAnsibleExecutor.kt
@@ -24,10 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BluePrintRestLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.BlueprintWebClientService
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.isNotNull
-import org.onap.ccsdk.cds.controllerblueprints.core.rootFieldsToMap
+import org.onap.ccsdk.cds.controllerblueprints.core.*
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.ConfigurableBeanFactory
@@ -68,6 +65,7 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
// input fields names accepted by this executor
const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
const val INPUT_JOB_TEMPLATE_NAME = "job-template-name"
+ const val INPUT_WORKFLOW_JOB_TEMPLATE_NAME = "workflow-job-template-id"
const val INPUT_LIMIT_TO_HOST = "limit"
const val INPUT_INVENTORY = "inventory"
const val INPUT_EXTRA_VARS = "extra-vars"
@@ -85,12 +83,20 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
try {
val restClientService = getAWXRestClient()
- val jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).asText()
- val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName)
+ // Get either a job template name or a workflow template name property
+ var workflowURIPrefix = ""
+ var jobTemplateName = getOperationInput(INPUT_JOB_TEMPLATE_NAME).returnNullIfMissing()?.textValue() ?: ""
+ val isWorkflowJT = jobTemplateName.isBlank()
+ if (isWorkflowJT) {
+ jobTemplateName = getOperationInput(INPUT_WORKFLOW_JOB_TEMPLATE_NAME).asText()
+ workflowURIPrefix = "workflow_"
+ }
+
+ val jtId = lookupJobTemplateIDByName(restClientService, jobTemplateName, workflowURIPrefix)
if (jtId.isNotEmpty()) {
- runJobTemplateOnAWX(restClientService, jobTemplateName, jtId)
+ runJobTemplateOnAWX(restClientService, jobTemplateName, jtId, workflowURIPrefix)
} else {
- val message = "Job template ${jobTemplateName} does not exists"
+ val message = "Workflow/Job template ${jobTemplateName} does not exists"
log.error(message)
setNodeOutputErrors(ATTRIBUTE_EXEC_CMD_STATUS_ERROR, message)
}
@@ -135,9 +141,10 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
/**
* Finds the job template ID based on the job template name provided in the request
*/
- private fun lookupJobTemplateIDByName(awxClient: BlueprintWebClientService, job_template_name: String?): String {
+ private fun lookupJobTemplateIDByName(awxClient: BlueprintWebClientService, job_template_name: String?,
+ workflowPrefix : String) : String {
val encodedJTName = URI(null, null,
- "/api/v2/job_templates/${job_template_name}/",
+ "/api/v2/${workflowPrefix}job_templates/${job_template_name}/",
null, null).rawPath
// Get Job Template details by name
@@ -152,19 +159,20 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
* its execution. Finally, it retrieves the job results via the stdout api.
* The status and output attributes are populated in the process.
*/
- private fun runJobTemplateOnAWX(awxClient: BlueprintWebClientService, job_template_name: String?, jtId: String) {
+ private fun runJobTemplateOnAWX(awxClient: BlueprintWebClientService, job_template_name: String?, jtId: String,
+ workflowPrefix : String) {
setNodeOutputProperties("preparing".asJsonPrimitive(), "".asJsonPrimitive())
// Get Job Template requirements
- var response = awxClient.exchangeResource(GET, "/api/v2/job_templates/${jtId}/launch/", "")
+ var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", "")
// FIXME: handle non-successful SC
val jtLaunchReqs: JsonNode = mapper.readTree(response.body)
- val payload = prepareLaunchPayload(awxClient, jtLaunchReqs)
+ val payload = prepareLaunchPayload(awxClient, jtLaunchReqs, workflowPrefix.isBlank())
log.info("Running job with $payload, for requestId $processId.")
// Launch the job for the targeted template
var jtLaunched: JsonNode = JacksonUtils.objectMapper.createObjectNode()
- response = awxClient.exchangeResource(POST, "/api/v2/job_templates/${jtId}/launch/", payload)
+ response = awxClient.exchangeResource(POST, "/api/v2/${workflowPrefix}job_templates/${jtId}/launch/", payload)
if (response.status in HTTP_SUCCESS) {
jtLaunched = mapper.readTree(response.body)
val fieldsIgnored: JsonNode = jtLaunched.at("/ignored_fields")
@@ -180,7 +188,7 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
var jobStatus = "unknown"
var jobEndTime = "null"
while (jobEndTime == "null") {
- response = awxClient.exchangeResource(GET, "/api/v2/jobs/${jobId}/", "")
+ response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/", "")
val jobLaunched: JsonNode = mapper.readTree(response.body)
jobStatus = jobLaunched.at("/status").asText()
jobEndTime = jobLaunched.at("/finished").asText()
@@ -189,12 +197,10 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
log.info("Execution of job template $job_template_name in job #$jobId finished with status ($jobStatus) for requestId $processId")
- // Get job execution results (stdout)
- val plainTextHeaders = mutableMapOf<String, String>()
- plainTextHeaders["Content-Type"] = "text/plain ;utf-8"
- response = awxClient.exchangeResource(GET, "/api/v2/jobs/${jobId}/stdout/?format=txt", "", plainTextHeaders)
+ // Get workflow/job execution results
+ val collectedOutput = extractJobRunResponse(awxClient, jobId, workflowPrefix)
- setNodeOutputProperties(jobStatus.asJsonPrimitive(), response.body.asJsonPrimitive())
+ setNodeOutputProperties(jobStatus.asJsonPrimitive(), collectedOutput.asJsonPrimitive())
} else {
// The job template requirements were not fulfilled with the values passed in. The message below will
// provide more information via the response, like the ignored_fields, or variables_needed_to_start,
@@ -207,42 +213,77 @@ open class ComponentRemoteAnsibleExecutor(private val blueprintRestLibPropertySe
}
/**
+ * Extracts the response from either a job stdout call OR collects the workflow run output
+ */
+ private fun extractJobRunResponse(awxClient: BlueprintWebClientService, jobId: String, workflowPrefix: String): String {
+
+ // First, collect all job ID from either the job template run or the workflow nodes that ran
+ var jobIds : Array<String>
+ var collectedResponses = StringBuilder()
+ if (workflowPrefix.isNotEmpty()) {
+ var response = awxClient.exchangeResource(GET, "/api/v2/${workflowPrefix}jobs/${jobId}/workflow_nodes/", "")
+ val jobDetails = mapper.readTree(response.body).at("/results")
+ jobIds = emptyArray()
+ for (jobDetail in jobDetails.elements()) {
+ jobIds = jobIds.plus( jobDetail.at("/summary_fields/job/id").asText() )
+ }
+ } else {
+ jobIds = arrayOf(jobId)
+ }
+
+ // Then collect the response text from the corresponding jobIds
+ val plainTextHeaders = mutableMapOf<String, String>()
+ plainTextHeaders["Content-Type"] = "text/plain ;utf-8"
+ for (aJobId in jobIds) {
+ var response = awxClient.exchangeResource(GET, "/api/v2/jobs/${aJobId}/stdout/?format=txt", "", plainTextHeaders)
+ collectedResponses.append("Output for job ${aJobId}:")
+ collectedResponses.append(response.body)
+ }
+ return collectedResponses.toString()
+ }
+
+ /**
* Prepares the JSON payload expected by the job template api,
* by applying the overrides that were provided
* and allowed by the template definition flags in jtLaunchReqs
*/
- private fun prepareLaunchPayload(awxClient: BlueprintWebClientService, jtLaunchReqs: JsonNode): String {
+ private fun prepareLaunchPayload(awxClient: BlueprintWebClientService, jtLaunchReqs: JsonNode,
+ isWorkflow : Boolean): String {
val payload = JacksonUtils.objectMapper.createObjectNode()
// Parameter defaults
- val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST)
- val tagsProp = getOptionalOperationInput(INPUT_TAGS)
- val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS)
val inventoryProp = getOptionalOperationInput(INPUT_INVENTORY)
val extraArgs = getOperationInput(INPUT_EXTRA_VARS)
- val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean()
- if (askLimitOnLaunch && limitProp.isNotNull()) {
- payload.set(INPUT_LIMIT_TO_HOST, limitProp)
- }
- val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean()
- if (askTagsOnLaunch && tagsProp.isNotNull()) {
- payload.set(INPUT_TAGS, tagsProp)
- }
- if (askTagsOnLaunch && skipTagsProp.isNotNull()) {
- payload.set("skip_tags", skipTagsProp)
+ if (!isWorkflow) {
+ val limitProp = getOptionalOperationInput(INPUT_LIMIT_TO_HOST)
+ val tagsProp = getOptionalOperationInput(INPUT_TAGS)
+ val skipTagsProp = getOptionalOperationInput(INPUT_SKIP_TAGS)
+
+ val askLimitOnLaunch = jtLaunchReqs.at("/ask_limit_on_launch").asBoolean()
+ if (askLimitOnLaunch && limitProp.isNotNull()) {
+ payload.set(INPUT_LIMIT_TO_HOST, limitProp)
+ }
+ val askTagsOnLaunch = jtLaunchReqs.at("/ask_tags_on_launch").asBoolean()
+ if (askTagsOnLaunch && tagsProp.isNotNull()) {
+ payload.set(INPUT_TAGS, tagsProp)
+ }
+ if (askTagsOnLaunch && skipTagsProp.isNotNull()) {
+ payload.set("skip_tags", skipTagsProp)
+ }
}
+
val askInventoryOnLaunch = jtLaunchReqs.at("/ask_inventory_on_launch").asBoolean()
if (askInventoryOnLaunch && inventoryProp.isNotNull()) {
var inventoryKeyId = if (inventoryProp is TextNode) {
- resolveInventoryIdByName(awxClient, inventoryProp!!.textValue())?.asJsonPrimitive()
+ resolveInventoryIdByName(awxClient, inventoryProp.textValue())?.asJsonPrimitive()
} else {
inventoryProp
}
payload.set(INPUT_INVENTORY, inventoryKeyId)
}
val askVariablesOnLaunch = jtLaunchReqs.at("/ask_variables_on_launch").asBoolean()
- if (askVariablesOnLaunch && extraArgs != null) {
+ if (askVariablesOnLaunch) {
payload.set("extra_vars", extraArgs)
}
return payload.asJsonString(false)
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt
new file mode 100644
index 000000000..47b55b018
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/ApiDataExtensions.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import kotlin.reflect.KClass
+
+
+fun <T : Any> ExecutionServiceInput.payloadAsType(clazzType: KClass<T>): T {
+ val actionName = this.actionIdentifiers.actionName
+ val requestJsonNode = this.payload.get("$actionName-request")
+ return requestJsonNode.asType(clazzType.java)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
index 408bb58ed..8759338b7 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.withTimeout
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
@@ -47,6 +48,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
lateinit var interfaceName: String
lateinit var operationName: String
lateinit var nodeTemplateName: String
+ var timeout: Int = 180
var operationInputs: MutableMap<String, JsonNode> = hashMapOf()
override fun getName(): String {
@@ -87,6 +89,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
this.operationInputs.putAll(operationResolvedProperties)
+ val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT)
+ timeout?.let { this.timeout = timeout }
+
return executionRequest
}
@@ -118,7 +123,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
try {
prepareRequestNB(executionServiceInput)
- processNB(executionServiceInput)
+ withTimeout((timeout * 1000).toLong()) {
+ processNB(executionServiceInput)
+ }
} catch (runtimeException: RuntimeException) {
log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException)
recoverNB(runtimeException, executionServiceInput)
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
index 2a14be216..6bee17f4b 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
@@ -44,10 +44,8 @@ class ImperativeWorkflowExecutionService(
val graph = bluePrintContext.workflowByName(workflowName).asGraph()
- val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
- imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
- executionServiceInput, deferredOutput)
- return deferredOutput.await()
+ return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
+ executionServiceInput)
}
}
@@ -60,35 +58,41 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
lateinit var executionServiceInput: ExecutionServiceInput
lateinit var workflowName: String
- lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput>
override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
- input: ExecutionServiceInput,
- output: CompletableDeferred<ExecutionServiceOutput>) {
+ input: ExecutionServiceInput): ExecutionServiceOutput {
this.graph = graph
this.bluePrintRuntimeService = bluePrintRuntimeService
this.executionServiceInput = input
this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
- this.deferredExecutionServiceOutput = output
this.workflowId = bluePrintRuntimeService.id()
+ val output = CompletableDeferred<ExecutionServiceOutput>()
val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor().send(startMessage)
+ val workflowActor = workflowActor()
+ if (!workflowActor.isClosedForSend) {
+ workflowActor.send(startMessage)
+ } else {
+ throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
+ }
+ return output.await()
}
override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
return EdgeLabel.SUCCESS
}
- override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput {
- val wfStatus = if (exception != null) {
- val status = Status()
- status.message = BluePrintConstants.STATUS_FAILURE
- status.errorMessage = exception.message
- status
- } else {
- val status = Status()
- status.message = BluePrintConstants.STATUS_SUCCESS
- status
+ override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
+ val wfStatus = Status().apply {
+ if (exceptions.isNotEmpty()) {
+ exceptions.forEach {
+ val errorMessage = it.message ?: ""
+ bluePrintRuntimeService.getBluePrintError().addError(errorMessage)
+ log.error("workflow($workflowId) exception :", it)
+ }
+ message = BluePrintConstants.STATUS_FAILURE
+ } else {
+ message = BluePrintConstants.STATUS_SUCCESS
+ }
}
return ExecutionServiceOutput().apply {
commonHeader = executionServiceInput.commonHeader
diff --git a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt
index 89732e300..b64177aab 100644
--- a/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/NodeTemplateExecutionService.kt
@@ -22,7 +22,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.putJsonElement
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
import org.slf4j.LoggerFactory
@@ -37,15 +37,22 @@ open class NodeTemplateExecutionService {
executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
// Get the Blueprint Context
val blueprintContext = bluePrintRuntimeService.bluePrintContext()
+
+ val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName)
// Get the Component Name, NodeTemplate type is mapped to Component Name
- val componentName = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+ val componentName = nodeTemplate.type
val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+ val nodeTemplateImplementation = blueprintContext
+ .nodeTemplateOperationImplementation(nodeTemplateName, interfaceName, operationName)
+
+ val timeout: Int = nodeTemplateImplementation?.timeout ?: 180
+
log.info("executing node template($nodeTemplateName) component($componentName) " +
- "interface($interfaceName) operation($operationName)")
+ "interface($interfaceName) operation($operationName) with timeout($timeout) sec.")
// Get the Component Instance
val plugin = BluePrintDependencyService.instance<AbstractComponentFunction>(componentName)
@@ -62,9 +69,10 @@ open class NodeTemplateExecutionService {
// Populate Step Meta Data
val stepInputs: MutableMap<String, JsonNode> = hashMapOf()
- stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE, nodeTemplateName)
- stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_INTERFACE, interfaceName)
- stepInputs.putJsonElement(BluePrintConstants.PROPERTY_CURRENT_OPERATION, operationName)
+ stepInputs[BluePrintConstants.PROPERTY_CURRENT_NODE_TEMPLATE] = nodeTemplateName.asJsonPrimitive()
+ stepInputs[BluePrintConstants.PROPERTY_CURRENT_INTERFACE] = interfaceName.asJsonPrimitive()
+ stepInputs[BluePrintConstants.PROPERTY_CURRENT_OPERATION] = operationName.asJsonPrimitive()
+ stepInputs[BluePrintConstants.PROPERTY_CURRENT_TIMEOUT] = timeout.asJsonPrimitive()
val stepInputData = StepData().apply {
name = nodeTemplateName
properties = stepInputs
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index 064c196ed..ba5815bb6 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -180,6 +180,7 @@ object BluePrintConstants {
const val PROPERTY_CURRENT_NODE_TEMPLATE = "current-node-template"
const val PROPERTY_CURRENT_INTERFACE = "current-interface"
const val PROPERTY_CURRENT_OPERATION = "current-operation"
+ const val PROPERTY_CURRENT_TIMEOUT = "current-timeout"
const val PROPERTY_CURRENT_IMPLEMENTATION = "current-implementation"
const val PROPERTY_EXECUTION_REQUEST = "execution-request"
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
index 93ba15e99..08bc6c3fd 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt
@@ -242,6 +242,22 @@ fun Map<String, JsonNode>.getAsDouble(key: String): Double {
return this[key]?.asDouble() ?: throw BluePrintException("couldn't find value for key($key)")
}
+fun Map<String, JsonNode>.getOptionalAsString(key: String): String? {
+ return if (this.containsKey(key)) this[key]!!.asText() else null
+}
+
+fun Map<String, JsonNode>.getOptionalAsBoolean(key: String): Boolean? {
+ return if (this.containsKey(key)) this[key]!!.asBoolean() else null
+}
+
+fun Map<String, JsonNode>.getOptionalAsInt(key: String): Int? {
+ return if (this.containsKey(key)) this[key]!!.asInt() else null
+}
+
+fun Map<String, JsonNode>.getOptionalAsDouble(key: String): Double? {
+ return if (this.containsKey(key)) this[key]!!.asDouble() else null
+}
+
// Checks
inline fun checkEquals(value1: String?, value2: String?, lazyMessage: () -> Any): Boolean {
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt
index 9e1b7498e..fc796c9ed 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/data/BluePrintGraph.kt
@@ -33,7 +33,8 @@ enum class NodeStatus(val id: String) {
READY("ready"),
EXECUTING("executing"),
EXECUTED("executed"),
- SKIPPED("skipped")
+ SKIPPED("skipped"),
+ TERMINATED("terminated")
}
class Graph {
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt
index 066516fcc..b368c01aa 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintContext.kt
@@ -216,6 +216,11 @@ class BluePrintContext(val serviceTemplate: ServiceTemplate) {
?: throw BluePrintException("could't get NodeTemplate($nodeTemplateName)'s first InterfaceAssignment's first OperationAssignment name")
}
+ fun nodeTemplateOperationImplementation(nodeTemplateName: String, interfaceName: String, operationName: String)
+ : Implementation? {
+ return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).implementation
+ }
+
fun nodeTemplateInterfaceOperationInputs(nodeTemplateName: String, interfaceName: String, operationName: String): MutableMap<String, JsonNode>? {
return nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName).inputs
}
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
index 905150213..5cec3c947 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
@@ -30,13 +30,12 @@ import kotlin.coroutines.CoroutineContext
interface BluePrintWorkFlowService<In, Out> {
/** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
- * and workflow input [input], response will be retrieve from output [output]*/
- suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
- input: In, output: CompletableDeferred<Out>)
+ * and workflow input [input]*/
+ suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
suspend fun initializeWorkflow(input: In): EdgeLabel
- suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
+ suspend fun prepareWorkflowOutput(): Out
/** Prepare the message for the Node */
suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
@@ -91,6 +90,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
lateinit var workflowId: String
+ var exceptions: MutableList<Exception> = arrayListOf()
+
final override val coroutineContext: CoroutineContext
get() = job + CoroutineName("Wf")
@@ -100,7 +101,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
throw CancellationException("Workflow($workflowId) cancelled as requested")
}
- fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+ suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Process the workflow execution message */
suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
@@ -119,13 +120,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
// Wait for workflow completion or Error
nodeActor.invokeOnClose { exception ->
launch {
- log.info("End Node Completed, processing completion message")
- val bluePrintProcessorException: BluePrintProcessorException? =
- if (exception != null) BluePrintProcessorException(exception) else null
-
- val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
+ if (exception != null) exceptions.add(BluePrintProcessorException(exception))
+ log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
+ val workflowOutput = prepareWorkflowOutput()
workflowExecuteMessage.output.complete(workflowOutput)
- channel.close(exception)
+ channel.close()
}
}
}
@@ -135,7 +134,11 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
when (message) {
is WorkflowExecuteMessage<In, Out> -> {
launch {
- executeMessageActor(message)
+ try {
+ executeMessageActor(message)
+ } catch (e: Exception) {
+ exceptions.add(e)
+ }
}
}
is WorkflowRestartMessage<In, Out> -> {
@@ -153,7 +156,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
- private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+ private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
/** Send message to process from one state to other state */
fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
@@ -164,7 +167,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
// Process only Next Success Node
val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
- log.debug("Next Edges :$stateEdges")
if (stateEdges.isNotEmpty()) {
stateEdges.forEach { stateEdge ->
// Prepare next node ready message and Send NodeReadyMessage
@@ -213,7 +215,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
triggerToExecuteOrSkip(newMessage)
} else {
- log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
+ log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
}
} else {
triggerToExecuteOrSkip(message)
@@ -233,15 +235,19 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
}
// Update Node Completed
node.status = NodeStatus.EXECUTED
- log.info("Execute Node($node) -> Executed State($nodeState)")
+ log.info("Execute node(${node.id}) -> executed state($nodeState)")
+ // Check if the Node status edge is there, If not close processing
+ val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
// If End Node, Send End Message
if (graph.isEndNode(node)) {
// Close the current channel
channel.close()
+ } else if (!edgePresent) {
+ throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
} else {
val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
- log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
+ log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
// Process Skip Edges
skippingEdges.forEach { skippingEdge ->
// Prepare next node ready message and Send NodeReadyMessage
@@ -266,7 +272,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
// Call the Extension Function
val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
- log.info("Skip Node($node) -> Executed State($nodeState)")
+ log.info("Skip node(${node.id}) -> executed state($nodeState)")
// Mark the Current node as Skipped
node.status = NodeStatus.SKIPPED
// Look for next possible skip nodes
@@ -283,7 +289,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
channel.close()
- throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
+ throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
}
/** Process each actor message received based on type **/
@@ -294,7 +300,8 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
try {
readyNodeWorker(nodeMessage)
} catch (e: Exception) {
- channel.close(e)
+ exceptions.add(e)
+ channel.close()
}
}
is NodeExecuteMessage<In, Out> -> {
@@ -302,7 +309,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
try {
executeNodeWorker(nodeMessage)
} catch (e: Exception) {
- channel.close(e)
+ nodeMessage.node.status = NodeStatus.TERMINATED
+ exceptions.add(e)
+ channel.close()
}
}
}
@@ -311,7 +320,9 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
try {
skipNodeWorker(nodeMessage)
} catch (e: Exception) {
- channel.close(e)
+ nodeMessage.node.status = NodeStatus.TERMINATED
+ exceptions.add(e)
+ channel.close()
}
}
}
@@ -320,20 +331,12 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
try {
restartNodeWorker(nodeMessage)
} catch (e: Exception) {
- channel.close(e)
+ exceptions.add(e)
+ channel.close()
}
}
}
}
}
}
-
- override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
- input: In, output: CompletableDeferred<Out>) {
- log.info("Executing Graph : $graph")
- this.graph = graph
- this.workflowId = bluePrintRuntimeService.id()
- val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor().send(startMessage)
- }
} \ No newline at end of file
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
index b8d8cea3e..4d97f8bc3 100644
--- a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
+++ b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
@@ -18,13 +18,13 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service
import io.mockk.every
import io.mockk.mockk
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.*
import org.junit.Test
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
import kotlin.test.assertNotNull
@@ -36,10 +36,66 @@ class BluePrintWorkflowServiceTest {
.toGraph()
val simpleWorkflow = TestBluePrintWorkFlowService()
simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
- val deferredOutput = CompletableDeferred<String>()
val input = "123456"
- simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput)
- val response = deferredOutput.await()
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testMultipleFlows() {
+ runBlocking {
+ coroutineScope {
+ val wfs = listOf("12345", "12346").map {
+ async {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService()
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(it), it)
+ assertNotNull(response, "failed to get response")
+ }
+ }
+ wfs.awaitAll()
+ }
+ }
+ }
+
+ @Test
+ fun testMissingEdgeForBFailureState() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService()
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B"))
+ val input = "123456"
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testBExceptionFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService()
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null)
+ val input = "123456"
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
+ assertNotNull(response, "failed to get response")
+ }
+ }
+
+ @Test
+ fun testTimeoutExceptionFlow() {
+ runBlocking {
+ val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+ .toGraph()
+ val simpleWorkflow = TestBluePrintWorkFlowService()
+ simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null)
+ val input = "123456"
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
assertNotNull(response, "failed to get response")
}
}
@@ -51,10 +107,8 @@ class BluePrintWorkflowServiceTest {
.toGraph()
val simpleWorkflow = TestBluePrintWorkFlowService()
simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
- val deferredOutput = CompletableDeferred<String>()
val input = "123456"
- simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput)
- val response = deferredOutput.await()
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
assertNotNull(response, "failed to get response")
}
}
@@ -68,10 +122,8 @@ class BluePrintWorkflowServiceTest {
val failurePathWorkflow = TestBluePrintWorkFlowService()
failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"),
arrayListOf("A"))
- val failurePathWorkflowDeferredOutput = CompletableDeferred<String>()
val failurePathWorkflowInput = "123456"
- failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput)
- val failurePathResponse = failurePathWorkflowDeferredOutput.await()
+ val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBluePrintRuntimeService(), failurePathWorkflowInput)
assertNotNull(failurePathResponse, "failed to get response")
}
}
@@ -83,10 +135,8 @@ class BluePrintWorkflowServiceTest {
.toGraph()
val simpleWorkflow = TestBluePrintWorkFlowService()
simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
- val deferredOutput = CompletableDeferred<String>()
val input = "123456"
- simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput)
- val response = deferredOutput.await()
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
assertNotNull(response, "failed to get response")
}
}
@@ -98,17 +148,19 @@ class BluePrintWorkflowServiceTest {
.toGraph()
val simpleWorkflow = TestBluePrintWorkFlowService()
simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
- val deferredOutput = CompletableDeferred<String>()
val input = "123456"
- simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input, deferredOutput)
- val response = deferredOutput.await()
+ val response = simpleWorkflow.executeWorkflow(graph, mockBluePrintRuntimeService(), input)
assertNotNull(response, "failed to get response")
}
}
private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> {
+ return mockBluePrintRuntimeService("123456")
+ }
+
+ private fun mockBluePrintRuntimeService(id: String): BluePrintRuntimeService<*> {
val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>()
- every { bluePrintRuntimeService.id() } returns "123456"
+ every { bluePrintRuntimeService.id() } returns id
return bluePrintRuntimeService
}
@@ -126,6 +178,7 @@ class BluePrintWorkflowServiceTest {
class TestBluePrintWorkFlowService
: AbstractBluePrintWorkFlowService<String, String>() {
+ val log = logger(TestBluePrintWorkFlowService::class)
lateinit var simulatedState: MutableMap<String, EdgeLabel>
@@ -133,6 +186,21 @@ class TestBluePrintWorkFlowService
return EdgeLabel.SUCCESS
}
+ override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: String): String {
+ log.info("Executing Graph : $graph")
+ this.graph = graph
+ this.workflowId = bluePrintRuntimeService.id()
+ val output = CompletableDeferred<String>()
+ val startMessage = WorkflowExecuteMessage(input, output)
+ val workflowActor = workflowActor()
+ if (!workflowActor.isClosedForSend) {
+ workflowActor().send(startMessage)
+ } else {
+ throw BluePrintProcessorException("workflow actor is closed for send $workflowActor")
+ }
+ return startMessage.output.await()
+ }
+
override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
: NodeExecuteMessage<String, String> {
return NodeExecuteMessage(node, "$node Input", "")
@@ -140,23 +208,26 @@ class TestBluePrintWorkFlowService
override suspend fun executeNode(node: Graph.Node, nodeInput: String,
nodeOutput: String): EdgeLabel {
-// val random = (1..10).random() * 1000
-// println("will reply in $random ms")
+// val random = (1..10).random() * 100
+// log.info("workflow($workflowId) node(${node.id}) will reply in $random ms")
// kotlinx.coroutines.delay(random.toLong())
- val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- return status
+// //Simulation for timeout
+ if (node.id == "TO") {
+ withTimeout(1) {
+ kotlinx.coroutines.delay(2)
+ }
+ }
+ return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
}
override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> {
val nodeOutput = ""
- val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput)
- return nodeSkipMessage
+ return NodeSkipMessage(node, "$node Skip Input", nodeOutput)
}
override suspend fun skipNode(node: Graph.Node, nodeInput: String,
nodeOutput: String): EdgeLabel {
- val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
- return status
+ return simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
}
override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
@@ -169,7 +240,12 @@ class TestBluePrintWorkFlowService
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
- override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String {
+ override suspend fun prepareWorkflowOutput(): String {
+ if (exceptions.isNotEmpty()) {
+ exceptions.forEach {
+ log.error("workflow($workflowId) exceptions :", it)
+ }
+ }
return "Final Response"
}
} \ No newline at end of file
diff --git a/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExeptionHandler.kt b/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExceptionHandler.kt
index de8ba93e3..5b92369f8 100644
--- a/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExeptionHandler.kt
+++ b/ms/controllerblueprints/modules/service/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/service/controller/ControllerBlueprintExceptionHandler.kt
@@ -20,6 +20,7 @@ import org.springframework.web.bind.annotation.RestControllerAdvice
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.onap.ccsdk.cds.controllerblueprints.core.data.ErrorCode
import org.onap.ccsdk.cds.controllerblueprints.service.common.ErrorMessage
+import org.slf4j.LoggerFactory
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.ExceptionHandler
@@ -32,19 +33,26 @@ import org.springframework.web.bind.annotation.ExceptionHandler
* @version 1.0
*/
@RestControllerAdvice("org.onap.ccsdk.cds.controllerblueprints")
-open class ControllerBlueprintExeptionHandler {
+open class ControllerBlueprintExceptionHandler {
+
+ companion object ControllerBlueprintExeptionHandler {
+ val LOG = LoggerFactory.getLogger(ControllerBlueprintExceptionHandler::class.java)
+ }
@ExceptionHandler
- fun ControllerBlueprintException(e: BluePrintException): ResponseEntity<ErrorMessage> {
+ fun ControllerBlueprintExceptionHandler(e: BluePrintException): ResponseEntity<ErrorMessage> {
var errorCode = ErrorCode.valueOf(e.code)
val errorMessage = ErrorMessage(errorCode?.message(e.message!!), errorCode?.value, "ControllerBluePrint_Error_Message")
+ LOG.error("Error: $errorCode ${e.message}")
return ResponseEntity(errorMessage, HttpStatus.resolve(errorCode!!.httpCode))
}
@ExceptionHandler
- fun ControllerBlueprintException(e: Exception): ResponseEntity<ErrorMessage> {
+ fun ControllerBlueprintExceptionHandler(e: Exception): ResponseEntity<ErrorMessage> {
var errorCode = ErrorCode.GENERIC_FAILURE
val errorMessage = ErrorMessage(errorCode?.message(e.message!!), errorCode?.value, "ControllerBluePrint_Error_Message")
+ LOG.error("Error: $errorCode ${e.message}")
return ResponseEntity(errorMessage, HttpStatus.resolve(errorCode!!.httpCode))
}
-} \ No newline at end of file
+}
+