diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-05-11 14:38:01 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-05-11 14:43:56 -0400 |
commit | 1f3bc0ca592a7f4245b0f70bfdcf63222dbf6106 (patch) | |
tree | bbc3591875bbf38613b5477bec66ff5271a6d60d | |
parent | 16ced660a671dea28fa56ffa860651d78ebb4501 (diff) |
Kafka Audit Service : CorrelationUUID from request is not matching the correct response in Kafka
Moved out CorrelationUUID linking between the request and the response from the Kafka Audit Service to the ExecutionServiceHandler.
This prevents the race condition happening when several ExecutionServiceOutputs try to set the CorrelationUUID related to their ExecutionServiceInput.
Issue-ID: CCSDK-2370
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I0c5934d4486961fbfcb34fd2d2492cd843350025
5 files changed, 7 insertions, 9 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 74c4b00e4..e9d0b7b51 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -75,7 +75,7 @@ class ExecutionServiceHandler( "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true ) - publishAuditService.publish(executionServiceOutput) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) responseObserver.onNext( executionServiceOutput.toProto() ) @@ -121,7 +121,7 @@ class ExecutionServiceHandler( executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } - publishAuditService.publish(executionServiceOutput) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput) return executionServiceOutput } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 1c5d47c27..9f406f7aa 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -51,7 +51,6 @@ class KafkaPublishAuditService( ) : PublishAuditService { private var inputInstance: BlueprintMessageProducerService? = null private var outputInstance: BlueprintMessageProducerService? = null - private lateinit var correlationUUID: String private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) companion object { @@ -70,7 +69,6 @@ class KafkaPublishAuditService( * Sensitive data within the request are hidden. */ override suspend fun publish(executionServiceInput: ExecutionServiceInput) { - this.correlationUUID = executionServiceInput.correlationUUID val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) this.inputInstance = this.getInputInstance(INPUT_SELECTOR) this.inputInstance!!.sendMessage(secureExecutionServiceInput) @@ -81,8 +79,8 @@ class KafkaPublishAuditService( * The correlation UUID is used to link the output to its input. * A correlation UUID is added to link the input to its output. */ - override fun publish(executionServiceOutput: ExecutionServiceOutput) { - executionServiceOutput.correlationUUID = this.correlationUUID + override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { + executionServiceOutput.correlationUUID = correlationUUID this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) this.outputInstance!!.sendMessage(executionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt index 3f782000b..eb66e411a 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -42,6 +42,6 @@ class NoPublishAuditService : PublishAuditService { override suspend fun publish(executionServiceInput: ExecutionServiceInput) { } - override fun publish(executionServiceOutput: ExecutionServiceOutput) { + override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt index 535a5eae0..72f493187 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -21,5 +21,5 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp interface PublishAuditService { suspend fun publish(executionServiceInput: ExecutionServiceInput) - fun publish(executionServiceOutput: ExecutionServiceOutput) + fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index 37f7861be..191456296 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -114,7 +114,7 @@ class ExecutionServiceHandlerTest { } verify { - publishAuditService.publish(executionServiceOutput!!) + publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!) } } } |