diff options
author | KAPIL SINGAL <ks220y@att.com> | 2020-08-10 11:30:27 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-08-10 11:30:27 +0000 |
commit | 6bd8ce3a79f4caefa0b810a71285d96edb389da4 (patch) | |
tree | ece5212dfa668743034ce887f1775feeadacbbf4 /ms/blueprintsprocessor/modules/inbounds | |
parent | be701b483f4c775285b372bca0cfeb7f0332e931 (diff) | |
parent | f72ff0cff34d17147a5142eb57987b1515b80580 (diff) |
Merge "Make use of Kafka Key for Audit service and Kafka listener"
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
2 files changed, 9 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 1f3dd6547..1ccf230d7 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.nio.charset.Charset +import java.util.UUID import java.util.concurrent.Phaser import javax.annotation.PreDestroy @@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer( launch { try { ph.register() - log.trace("Consumed Message : $message") - val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() + val key = message.key() ?: UUID.randomUUID().toString() + val value = String(message.value(), Charset.defaultCharset()) + log.trace("Consumed Message : key($key) value($value)") + val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(executionServiceOutput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 12cadfa66..145c37b01 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -85,7 +85,7 @@ class KafkaPublishAuditService( val key = secureExecutionServiceInput.actionIdentifiers.blueprintName try { this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + this.inputInstance!!.sendMessage(key, secureExecutionServiceInput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : ${e.message}" @@ -106,7 +106,7 @@ class KafkaPublishAuditService( val key = executionServiceOutput.actionIdentifiers.blueprintName try { this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + this.outputInstance!!.sendMessage(key, executionServiceOutput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : $e" |