From 1f3bc0ca592a7f4245b0f70bfdcf63222dbf6106 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 11 May 2020 14:38:01 -0400 Subject: Kafka Audit Service : CorrelationUUID from request is not matching the correct response in Kafka Moved out CorrelationUUID linking between the request and the response from the Kafka Audit Service to the ExecutionServiceHandler. This prevents the race condition happening when several ExecutionServiceOutputs try to set the CorrelationUUID related to their ExecutionServiceInput. Issue-ID: CCSDK-2370 Signed-off-by: Julien Fontaine Change-Id: I0c5934d4486961fbfcb34fd2d2492cd843350025 --- .../blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt | 4 ++-- .../blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt | 6 ++---- .../blueprintsprocessor/selfservice/api/NoPublishAuditService.kt | 2 +- .../cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt | 2 +- .../selfservice/api/ExecutionServiceHandlerTest.kt | 2 +- 5 files changed, 7 insertions(+), 9 deletions(-) (limited to 'ms/blueprintsprocessor') diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 74c4b00e4..e9d0b7b51 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -75,7 +75,7 @@ class ExecutionServiceHandler( "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true ) - publishAuditService.publish(executionServiceOutput) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) responseObserver.onNext( executionServiceOutput.toProto() ) @@ -121,7 +121,7 @@ class ExecutionServiceHandler( executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } - publishAuditService.publish(executionServiceOutput) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) return executionServiceOutput } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 1c5d47c27..9f406f7aa 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -51,7 +51,6 @@ class KafkaPublishAuditService( ) : PublishAuditService { private var inputInstance: BlueprintMessageProducerService? = null private var outputInstance: BlueprintMessageProducerService? = null - private lateinit var correlationUUID: String private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) companion object { @@ -70,7 +69,6 @@ class KafkaPublishAuditService( * Sensitive data within the request are hidden. */ override suspend fun publish(executionServiceInput: ExecutionServiceInput) { - this.correlationUUID = executionServiceInput.correlationUUID val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) this.inputInstance = this.getInputInstance(INPUT_SELECTOR) this.inputInstance!!.sendMessage(secureExecutionServiceInput) @@ -81,8 +79,8 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the output to its input. * A correlation UUID is added to link the input to its output. */ - override fun publish(executionServiceOutput: ExecutionServiceOutput) { - executionServiceOutput.correlationUUID = this.correlationUUID + override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + executionServiceOutput.correlationUUID = correlationUUID this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) this.outputInstance!!.sendMessage(executionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt index 3f782000b..eb66e411a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -42,6 +42,6 @@ class NoPublishAuditService : PublishAuditService { override suspend fun publish(executionServiceInput: ExecutionServiceInput) { } - override fun publish(executionServiceOutput: ExecutionServiceOutput) { + override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt index 535a5eae0..72f493187 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -21,5 +21,5 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp interface PublishAuditService { suspend fun publish(executionServiceInput: ExecutionServiceInput) - fun publish(executionServiceOutput: ExecutionServiceOutput) + fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 37f7861be..191456296 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -114,7 +114,7 @@ class ExecutionServiceHandlerTest { } verify { - publishAuditService.publish(executionServiceOutput!!) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!) } } } -- cgit 1.2.3-korg