diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-06-18 18:16:38 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-06-22 14:46:22 -0400 |
commit | c2046c09fd65a9ba4d000beacb3836869db771f2 (patch) | |
tree | 847f9bb3824b0a906589567e819673b4c7360207 /ms/blueprintsprocessor/modules/inbounds | |
parent | 214c8dfe0e56aff0fe0c2a3a5d7f76f2a4972710 (diff) |
Kafka Audit Service : Improve error handling and miscellaneous refactoring
When Kafka Audit Service fails it no longer stops Blueprint Processor execution
* Add error handling when trying to hide sensitive data
* Add error handling when trying to send kafka message
* Set timeout for blocking loop when sending messages with kafka producer
-> When broker is not available producer tries to reconnect in a blocking loop
* Refactor Audit Service interface to give more explict name for publish methods
* Modify publishExecutionOutput() to a non-blocking function
Issue-ID: CCSDK-2459
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I809a5f34f81889aa9eed499608348f149984bc38
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
5 files changed, 96 insertions, 87 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index e9d0b7b51..6c62aae88 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -69,13 +69,13 @@ class ExecutionServiceHandler( responseObserver.onCompleted() } else -> { - publishAuditService.publish(executionServiceInput) + publishAuditService.publishExecutionInput(executionServiceInput) val executionServiceOutput = response( executionServiceInput, "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true ) - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) responseObserver.onNext( executionServiceOutput.toProto() ) @@ -93,9 +93,9 @@ class ExecutionServiceHandler( log.info("processing request id $requestId") - try { - publishAuditService.publish(executionServiceInput) + publishAuditService.publishExecutionInput(executionServiceInput) + try { /** Check Blueprint is needed for this request */ if (checkServiceFunction(executionServiceInput)) { executionServiceOutput = executeServiceFunction(executionServiceInput) @@ -121,7 +121,7 @@ class ExecutionServiceHandler( executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) return executionServiceOutput } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 6ff217942..fca73981e 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants -import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService @@ -70,10 +69,17 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the input to its output. * Sensitive data within the request are hidden. */ - override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) - this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + try { + this.inputInstance = this.getInputInstance(INPUT_SELECTOR) + this.inputInstance!!.sendMessage(secureExecutionServiceInput) + } catch (e: Exception) { + var errMsg = + if (e.message != null) "ERROR : ${e.message}" + else "ERROR : Failed to send execution request to Kafka." + log.error(errMsg) + } } /** @@ -81,10 +87,17 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the output to its input. * A correlation UUID is added to link the input to its output. */ - override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { executionServiceOutput.correlationUUID = correlationUUID - this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + try { + this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) + this.outputInstance!!.sendMessage(executionServiceOutput) + } catch (e: Exception) { + var errMsg = + if (e.message != null) "ERROR : $e" + else "ERROR : Failed to send execution request to Kafka." + log.error(errMsg) + } } /** @@ -101,15 +114,8 @@ class KafkaPublishAuditService( * Create a kafka producer instance. */ private fun createInstance(selector: String): BlueprintMessageProducerService { - log.info( - "Setting up message producer($selector)..." - ) - return try { - bluePrintMessageLibPropertyService - .blueprintMessageProducerService(selector) - } catch (e: Exception) { - throw BluePrintProcessorException("failed to create producer service ${e.message}") - } + log.info("Setting up message producer($selector)...") + return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector) } /** @@ -134,66 +140,73 @@ class KafkaPublishAuditService( if (blueprintName == "default") return clonedExecutionServiceInput - if (clonedExecutionServiceInput.payload - .path("$workflowName-request").has("$workflowName-properties")) { - - /** Retrieving sensitive input parameters */ - val requestId = clonedExecutionServiceInput.commonHeader.requestId - val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion - - val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) - - val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - val blueprintContext = blueprintRuntimeService.bluePrintContext() - - /** Looking for node templates defined as component-resource-resolution */ - val nodeTemplates = blueprintContext.nodeTemplates() - nodeTemplates!!.forEach { nodeTemplate -> - val nodeTemplateName = nodeTemplate.key - val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type - if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { - val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) - val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) - - val propertyAssignments: MutableMap<String, JsonNode> = - blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) - ?: hashMapOf() - - /** Getting values define in artifact-prefix-names */ - val input = executionServiceInput.payload.get("$workflowName-request") - blueprintRuntimeService.assignWorkflowInputs(workflowName, input) - val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] - val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) - val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( - BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, - nodeTemplateName, - ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, - artifactPrefixNamesNode!!) - - val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) - - /** Storing mapping entries with metadata log-protect set to true */ - val sensitiveParameters: List<String> = artifactPrefixNames - .map { "$it-mapping" } - .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } - .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } - .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } - .map { it.name } - - /** Hiding sensitive input parameters from the request */ - var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload - .path("$workflowName-request") - .path("$workflowName-properties") as ObjectNode - - sensitiveParameters.forEach { sensitiveParameter -> - if (workflowProperties.has(sensitiveParameter)) { - workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) + try { + if (clonedExecutionServiceInput.payload + .path("$workflowName-request").has("$workflowName-properties")) { + + /** Retrieving sensitive input parameters */ + val requestId = clonedExecutionServiceInput.commonHeader.requestId + val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion + + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + val blueprintContext = blueprintRuntimeService.bluePrintContext() + + /** Looking for node templates defined as component-resource-resolution */ + val nodeTemplates = blueprintContext.nodeTemplates() + nodeTemplates!!.forEach { nodeTemplate -> + val nodeTemplateName = nodeTemplate.key + val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type + if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { + val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) + val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + + val propertyAssignments: MutableMap<String, JsonNode> = + blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + ?: hashMapOf() + + /** Getting values define in artifact-prefix-names */ + val input = executionServiceInput.payload.get("$workflowName-request") + blueprintRuntimeService.assignWorkflowInputs(workflowName, input) + val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] + val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) + val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( + BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, + nodeTemplateName, + ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, + artifactPrefixNamesNode!!) + + val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) + + /** Storing mapping entries with metadata log-protect set to true */ + val sensitiveParameters: List<String> = artifactPrefixNames + .map { "$it-mapping" } + .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } + .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } + .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } + .map { it.name } + + /** Hiding sensitive input parameters from the request */ + var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload + .path("$workflowName-request") + .path("$workflowName-properties") as ObjectNode + + sensitiveParameters.forEach { sensitiveParameter -> + if (workflowProperties.has(sensitiveParameter)) { + workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) + } } } } } + } catch (e: Exception) { + val errMsg = "ERROR : Couldn't hide sensitive data in the execution request." + log.error(errMsg, e) + clonedExecutionServiceInput.payload.replace( + "$workflowName-request", + "$errMsg $e".asJsonPrimitive()) } - return clonedExecutionServiceInput } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt index eb66e411a..6ad73d88a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -39,9 +39,9 @@ class NoPublishAuditService : PublishAuditService { log.info("Audit service is disabled") } - override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { } - override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt index 72f493187..67473c807 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -20,6 +20,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput interface PublishAuditService { - suspend fun publish(executionServiceInput: ExecutionServiceInput) - fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) + suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) + suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 191456296..70e1ed0fd 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -16,7 +16,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api -import io.mockk.verify import io.mockk.coVerify import io.mockk.Runs import io.mockk.coEvery @@ -102,7 +101,7 @@ class ExecutionServiceHandlerTest { publishAuditService ) - coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs + coEvery { publishAuditService.publishExecutionInput(ExecutionServiceInput()) } just Runs var executionServiceOutput: ExecutionServiceOutput? = null runBlocking { @@ -110,11 +109,8 @@ class ExecutionServiceHandlerTest { } coVerify { - publishAuditService.publish(executionServiceInput) - } - - verify { - publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!) + publishAuditService.publishExecutionInput(executionServiceInput) + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput!!) } } } |