summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds/selfservice-api
diff options
context:
space:
mode:
authorKAPIL SINGAL <ks220y@att.com>2020-08-10 11:30:27 +0000
committerGerrit Code Review <gerrit@onap.org>2020-08-10 11:30:27 +0000
commit6bd8ce3a79f4caefa0b810a71285d96edb389da4 (patch)
treeece5212dfa668743034ce887f1775feeadacbbf4 /ms/blueprintsprocessor/modules/inbounds/selfservice-api
parentbe701b483f4c775285b372bca0cfeb7f0332e931 (diff)
parentf72ff0cff34d17147a5142eb57987b1515b80580 (diff)
Merge "Make use of Kafka Key for Audit service and Kafka listener"
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt4
2 files changed, 9 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index 1f3dd6547..1ccf230d7 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
+import java.nio.charset.Charset
+import java.util.UUID
import java.util.concurrent.Phaser
import javax.annotation.PreDestroy
@@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer(
launch {
try {
ph.register()
- log.trace("Consumed Message : $message")
- val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+ val key = message.key() ?: UUID.randomUUID().toString()
+ val value = String(message.value(), Charset.defaultCharset())
+ log.trace("Consumed Message : key($key) value($value)")
+ val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
} finally {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 12cadfa66..145c37b01 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -85,7 +85,7 @@ class KafkaPublishAuditService(
val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
- this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
} catch (e: Exception) {
var errMsg =
if (e.message != null) "ERROR : ${e.message}"
@@ -106,7 +106,7 @@ class KafkaPublishAuditService(
val key = executionServiceOutput.actionIdentifiers.blueprintName
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
- this.outputInstance!!.sendMessage(executionServiceOutput)
+ this.outputInstance!!.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
var errMsg =
if (e.message != null) "ERROR : $e"