aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-08-04 11:57:56 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-08-04 18:24:12 -0400
commitf72ff0cff34d17147a5142eb57987b1515b80580 (patch)
tree556ae71f28b4192703bcb1878375aaeea2408122 /ms/blueprintsprocessor/modules/inbounds
parent260b95f2a1fe773bb1e89150164ae2a7c880be04 (diff)
Make use of Kafka Key for Audit service and Kafka listener
* When message is sent by audit service, key will be the CBA name * When sent by kafka listener (self-service api), key is the same as the request message key consumed. If not specified, a random UUID * MessageProducer interface refactoring : * add 'key' parameter to specify a key * add default value null to paramater 'headers' to remove some unnecessary method Issue-ID: CCSDK-2628 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: I68580151184c87104c07037f379276dd8c8c71c7
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt6
2 files changed, 11 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index 1f3dd6547..1ccf230d7 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
+import java.nio.charset.Charset
+import java.util.UUID
import java.util.concurrent.Phaser
import javax.annotation.PreDestroy
@@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer(
launch {
try {
ph.register()
- log.trace("Consumed Message : $message")
- val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+ val key = message.key() ?: UUID.randomUUID().toString()
+ val value = String(message.value(), Charset.defaultCharset())
+ log.trace("Consumed Message : key($key) value($value)")
+ val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
} finally {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index fca73981e..93885bfa2 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -71,9 +71,10 @@ class KafkaPublishAuditService(
*/
override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+ val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
- this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
} catch (e: Exception) {
var errMsg =
if (e.message != null) "ERROR : ${e.message}"
@@ -89,9 +90,10 @@ class KafkaPublishAuditService(
*/
override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
executionServiceOutput.correlationUUID = correlationUUID
+ val key = executionServiceOutput.actionIdentifiers.blueprintName
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
- this.outputInstance!!.sendMessage(executionServiceOutput)
+ this.outputInstance!!.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
var errMsg =
if (e.message != null) "ERROR : $e"