From f72ff0cff34d17147a5142eb57987b1515b80580 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Tue, 4 Aug 2020 11:57:56 -0400 Subject: Make use of Kafka Key for Audit service and Kafka listener * When message is sent by audit service, key will be the CBA name * When sent by kafka listener (self-service api), key is the same as the request message key consumed. If not specified, a random UUID * MessageProducer interface refactoring : * add 'key' parameter to specify a key * add default value null to paramater 'headers' to remove some unnecessary method Issue-ID: CCSDK-2628 Signed-off-by: Julien Fontaine Change-Id: I68580151184c87104c07037f379276dd8c8c71c7 --- .../selfservice/api/BluePrintProcessingKafkaConsumer.kt | 10 +++++++--- .../selfservice/api/KafkaPublishAuditService.kt | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src') diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 1f3dd6547..1ccf230d7 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.nio.charset.Charset +import java.util.UUID import java.util.concurrent.Phaser import javax.annotation.PreDestroy @@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer( launch { try { ph.register() - log.trace("Consumed Message : $message") - val executionServiceInput = message.jsonAsType() + val key = message.key() ?: UUID.randomUUID().toString() + val value = String(message.value(), Charset.defaultCharset()) + log.trace("Consumed Message : key($key) value($value)") + val executionServiceInput = value.jsonAsType() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(executionServiceOutput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index fca73981e..93885bfa2 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -71,9 +71,10 @@ class KafkaPublishAuditService( */ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) + val key = secureExecutionServiceInput.actionIdentifiers.blueprintName try { this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + this.inputInstance!!.sendMessage(key, secureExecutionServiceInput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : ${e.message}" @@ -89,9 +90,10 @@ class KafkaPublishAuditService( */ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { executionServiceOutput.correlationUUID = correlationUUID + val key = executionServiceOutput.actionIdentifiers.blueprintName try { this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + this.outputInstance!!.sendMessage(key, executionServiceOutput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : $e" -- cgit 1.2.3-korg