diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
2 files changed, 29 insertions, 12 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index 83cc0e022..af689a1f2 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,13 @@ open class KafkaMessageConsumerService( /** execute the command block */ if (!channel.isClosedForSend) { channel.send(consumerRecord) + log.info( + "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " + + "partition(${consumerRecord.partition()}) " + + "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " + + "offset(${consumerRecord.offset()}) " + + "key(${consumerRecord.key()})" + ) } else { log.error("Channel is closed to receive message") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index eccf75301..88b0dfaae 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,16 +77,12 @@ class KafkaMessageProducerService( } val callback = Callback { metadata, exception -> if (exception != null) - log.error("ERROR : ${exception.message}") + log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) else { - var logMessage = when (clonedMessage) { - is ExecutionServiceInput -> - "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" - is ExecutionServiceOutput -> - "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" - else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers" - } - log.info(logMessage) + val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + + "partition(${metadata.partition()}) " + + "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." + log.info(message) } } messageTemplate().send(record, callback) @@ -114,7 +110,7 @@ class KafkaMessageProducerService( /** Truncation of error messages */ var truncErrMsg = executionServiceOutput.status.errorMessage if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) { - truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" + + truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) + " [...]. Check Blueprint Processor logs for more information." } /** Truncation for Command Executor responses */ @@ -139,4 +135,18 @@ class KafkaMessageProducerService( stepData = executionServiceOutput.stepData } } + + private fun getMessageLogData(message: Any): String { + return when (message) { + is ExecutionServiceInput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + is ExecutionServiceOutput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + else -> "message($message)" + } + } } |