diff options
11 files changed, 150 insertions, 110 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index b07d64388..d76621c26 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties() open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { var clientId: String? = null - // strongest producing guarantee - var acks: String = "all" - var retries: Int = 0 - // ensure we don't push duplicates - var enableIdempotence: Boolean = true + var acks: String = "all" // strongest producing guarantee + var maxBlockMs: Int = 250 // max blocking time in ms to send a message + var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) + var enableIdempotence: Boolean = true // ensure we don't push duplicates override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs + configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence if (clientId != null) { configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index c659fdb8b..e9bc5d8ad 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks) - fun retries(retries: Int) = retries(retries.asJsonPrimitive()) + fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive()) - fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries) + fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs) + + fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive()) + + fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs) fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive()) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index 612a57d23..b1af230b9 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -35,7 +35,8 @@ class MessagePropertiesDSLTest { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") acks("all") - retries(3) + maxBlockMs(0) + reconnectBackOffMs(60 * 60 * 1000) enableIdempotence(true) topic("sample-topic") truststore("/path/to/truststore.jks") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index da7394998..537dab1ba 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -100,6 +100,8 @@ open class BlueprintMessageProducerServiceTest { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, + ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index e9d0b7b51..6c62aae88 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -69,13 +69,13 @@ class ExecutionServiceHandler( responseObserver.onCompleted() } else -> { - publishAuditService.publish(executionServiceInput) + publishAuditService.publishExecutionInput(executionServiceInput) val executionServiceOutput = response( executionServiceInput, "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true ) - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) responseObserver.onNext( executionServiceOutput.toProto() ) @@ -93,9 +93,9 @@ class ExecutionServiceHandler( log.info("processing request id $requestId") - try { - publishAuditService.publish(executionServiceInput) + publishAuditService.publishExecutionInput(executionServiceInput) + try { /** Check Blueprint is needed for this request */ if (checkServiceFunction(executionServiceInput)) { executionServiceOutput = executeServiceFunction(executionServiceInput) @@ -121,7 +121,7 @@ class ExecutionServiceHandler( executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) return executionServiceOutput } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 6ff217942..fca73981e 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService @@ -70,10 +69,17 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the input to its output. * Sensitive data within the request are hidden. */ - override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) - this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + try { + this.inputInstance = this.getInputInstance(INPUT_SELECTOR) + this.inputInstance!!.sendMessage(secureExecutionServiceInput) + } catch (e: Exception) { + var errMsg = + if (e.message != null) "ERROR : ${e.message}" + else "ERROR : Failed to send execution request to Kafka." + log.error(errMsg) + } } /** @@ -81,10 +87,17 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the output to its input. * A correlation UUID is added to link the input to its output. */ - override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { executionServiceOutput.correlationUUID = correlationUUID - this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + try { + this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) + this.outputInstance!!.sendMessage(executionServiceOutput) + } catch (e: Exception) { + var errMsg = + if (e.message != null) "ERROR : $e" + else "ERROR : Failed to send execution request to Kafka." + log.error(errMsg) + } } /** @@ -101,15 +114,8 @@ class KafkaPublishAuditService( * Create a kafka producer instance. */ private fun createInstance(selector: String): BlueprintMessageProducerService { - log.info( - "Setting up message producer($selector)..." - ) - return try { - bluePrintMessageLibPropertyService - .blueprintMessageProducerService(selector) - } catch (e: Exception) { - throw BluePrintProcessorException("failed to create producer service ${e.message}") - } + log.info("Setting up message producer($selector)...") + return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector) } /** @@ -134,66 +140,73 @@ class KafkaPublishAuditService( if (blueprintName == "default") return clonedExecutionServiceInput - if (clonedExecutionServiceInput.payload - .path("$workflowName-request").has("$workflowName-properties")) { - - /** Retrieving sensitive input parameters */ - val requestId = clonedExecutionServiceInput.commonHeader.requestId - val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion - - val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) - - val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - val blueprintContext = blueprintRuntimeService.bluePrintContext() - - /** Looking for node templates defined as component-resource-resolution */ - val nodeTemplates = blueprintContext.nodeTemplates() - nodeTemplates!!.forEach { nodeTemplate -> - val nodeTemplateName = nodeTemplate.key - val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type - if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { - val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) - val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) - - val propertyAssignments: MutableMap<String, JsonNode> = - blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) - ?: hashMapOf() - - /** Getting values define in artifact-prefix-names */ - val input = executionServiceInput.payload.get("$workflowName-request") - blueprintRuntimeService.assignWorkflowInputs(workflowName, input) - val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] - val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) - val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( - BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, - nodeTemplateName, - ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, - artifactPrefixNamesNode!!) - - val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) - - /** Storing mapping entries with metadata log-protect set to true */ - val sensitiveParameters: List<String> = artifactPrefixNames - .map { "$it-mapping" } - .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } - .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } - .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } - .map { it.name } - - /** Hiding sensitive input parameters from the request */ - var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload - .path("$workflowName-request") - .path("$workflowName-properties") as ObjectNode - - sensitiveParameters.forEach { sensitiveParameter -> - if (workflowProperties.has(sensitiveParameter)) { - workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) + try { + if (clonedExecutionServiceInput.payload + .path("$workflowName-request").has("$workflowName-properties")) { + + /** Retrieving sensitive input parameters */ + val requestId = clonedExecutionServiceInput.commonHeader.requestId + val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion + + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + val blueprintContext = blueprintRuntimeService.bluePrintContext() + + /** Looking for node templates defined as component-resource-resolution */ + val nodeTemplates = blueprintContext.nodeTemplates() + nodeTemplates!!.forEach { nodeTemplate -> + val nodeTemplateName = nodeTemplate.key + val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type + if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { + val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) + val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + + val propertyAssignments: MutableMap<String, JsonNode> = + blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + ?: hashMapOf() + + /** Getting values define in artifact-prefix-names */ + val input = executionServiceInput.payload.get("$workflowName-request") + blueprintRuntimeService.assignWorkflowInputs(workflowName, input) + val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] + val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) + val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( + BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, + nodeTemplateName, + ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, + artifactPrefixNamesNode!!) + + val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) + + /** Storing mapping entries with metadata log-protect set to true */ + val sensitiveParameters: List<String> = artifactPrefixNames + .map { "$it-mapping" } + .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } + .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } + .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } + .map { it.name } + + /** Hiding sensitive input parameters from the request */ + var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload + .path("$workflowName-request") + .path("$workflowName-properties") as ObjectNode + + sensitiveParameters.forEach { sensitiveParameter -> + if (workflowProperties.has(sensitiveParameter)) { + workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) + } } } } } + } catch (e: Exception) { + val errMsg = "ERROR : Couldn't hide sensitive data in the execution request." + log.error(errMsg, e) + clonedExecutionServiceInput.payload.replace( + "$workflowName-request", + "$errMsg $e".asJsonPrimitive()) } - return clonedExecutionServiceInput } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt index eb66e411a..6ad73d88a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -39,9 +39,9 @@ class NoPublishAuditService : PublishAuditService { log.info("Audit service is disabled") } - override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { } - override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt index 72f493187..67473c807 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -20,6 +20,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput interface PublishAuditService { - suspend fun publish(executionServiceInput: ExecutionServiceInput) - fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) + suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) + suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 191456296..70e1ed0fd 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -16,7 +16,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api -import io.mockk.verify import io.mockk.coVerify import io.mockk.Runs import io.mockk.coEvery @@ -102,7 +101,7 @@ class ExecutionServiceHandlerTest { publishAuditService ) - coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs + coEvery { publishAuditService.publishExecutionInput(ExecutionServiceInput()) } just Runs var executionServiceOutput: ExecutionServiceOutput? = null runBlocking { @@ -110,11 +109,8 @@ class ExecutionServiceHandlerTest { } coVerify { - publishAuditService.publish(executionServiceInput) - } - - verify { - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!) + publishAuditService.publishExecutionInput(executionServiceInput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput!!) } } } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 211bf76fb..4cd809778 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -105,7 +105,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic /** Resolve and validate lock properties */ implementation.lock?.apply { val resolvedValues = bluePrintRuntimeService.resolvePropertyAssignments( - nodeTemplateName, + BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, interfaceName, mutableMapOf("key" to this.key, "acquireTimeout" to this.acquireTimeout)) this.key = resolvedValues["key"] ?: "".asJsonType() @@ -153,21 +153,14 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic } override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { - prepareRequestNB(executionServiceInput) - return implementation.lock?.let { - bluePrintClusterService.clusterLock("${it.key.textValue()}@$CDS_LOCK_GROUP") - .executeWithLock(it.acquireTimeout.intValue().times(1000).toLong()) { - applyNBWithTimeout(executionServiceInput) - } - } ?: applyNBWithTimeout(executionServiceInput) - } - - private suspend fun applyNBWithTimeout(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { try { - withTimeout((implementation.timeout * 1000).toLong()) { - log.debug("DEBUG::: AbstractComponentFunction.withTimeout section ${implementation.timeout} seconds") - processNB(executionServiceInput) - } + prepareRequestNB(executionServiceInput) + implementation.lock?.let { + bluePrintClusterService.clusterLock("${it.key.textValue()}@$CDS_LOCK_GROUP") + .executeWithLock(it.acquireTimeout.intValue().times(1000).toLong()) { + applyNBWithTimeout(executionServiceInput) + } + } ?: applyNBWithTimeout(executionServiceInput) } catch (runtimeException: RuntimeException) { log.error("failed in ${getName()} : ${runtimeException.message}", runtimeException) recoverNB(runtimeException, executionServiceInput) @@ -175,6 +168,13 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic return prepareResponseNB() } + private suspend fun applyNBWithTimeout(executionServiceInput: ExecutionServiceInput) = + withTimeout((implementation.timeout * 1000).toLong()) { + log.debug("DEBUG::: AbstractComponentFunction.withTimeout " + + "section ${implementation.timeout} seconds") + processNB(executionServiceInput) + } + fun getOperationInput(key: String): JsonNode { return operationInputs[key] ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.") diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt index e0b690573..0f9dfd157 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.runBlocking import org.junit.Test @@ -53,6 +54,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.junit4.SpringRunner +import java.lang.RuntimeException import kotlin.test.BeforeTest import kotlin.test.assertEquals import kotlin.test.assertNotNull @@ -204,6 +206,27 @@ class AbstractComponentFunctionTest { } @Test + fun `applyNB should catch exceptions and call recoverNB`() { + val exception = RuntimeException("Intentional test exception") + every { + bluePrintRuntimeService.resolvePropertyAssignments(any(), any(), any()) + } throws exception + every { + blueprintContext.nodeTemplateOperationImplementation(any(), any(), any()) + } returns Implementation().apply { + this.lock = LockAssignment().apply { this.key = "testing-lock".asJsonType() } + } + + val component: AbstractComponentFunction = spyk(SampleComponent()) + component.bluePrintRuntimeService = bluePrintRuntimeService + component.bluePrintClusterService = blueprintClusterService + val input = getMockedInput(bluePrintRuntimeService) + + runBlocking { component.applyNB(input) } + verify { runBlocking { component.recoverNB(exception, input) } } + } + + @Test fun `applyNB - when lock is present use ClusterLock`() { val lockName = "testing-lock" |