From 507ce6db6765cc8215a05446a7777d97af176898 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Wed, 29 Jul 2020 19:06:00 -0400 Subject: Improve Kafka Producer callback messages Check if message sent is a BP request/response to print CBA name and version in the callback message. Issue-ID: CCSDK-2623 Signed-off-by: Julien Fontaine Change-Id: I526a01299a3bfca3d1c4a10ae19dead46393a9ae --- .../message/service/KafkaMessageProducerService.kt | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) (limited to 'ms/blueprintsprocessor/modules') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 8de1f05be..8958d4f0c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties @@ -79,8 +80,18 @@ class KafkaMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - if (exception == null) log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") - else log.error("ERROR : ${exception.message}") + if (exception != null) + log.error("ERROR : ${exception.message}") + else { + var logMessage = when (clonedMessage) { + is ExecutionServiceInput -> + "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" + is ExecutionServiceOutput -> + "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" + else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers" + } + log.info(logMessage) + } } messageTemplate().send(record, callback) return true -- cgit 1.2.3-korg