summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt32
2 files changed, 29 insertions, 12 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index 83cc0e022..af689a1f2 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,6 +81,13 @@ open class KafkaMessageConsumerService(
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
+ log.info(
+ "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
+ "partition(${consumerRecord.partition()}) " +
+ "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
+ "offset(${consumerRecord.offset()}) " +
+ "key(${consumerRecord.key()})"
+ )
} else {
log.error("Channel is closed to receive message")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index eccf75301..88b0dfaae 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -77,16 +77,12 @@ class KafkaMessageProducerService(
}
val callback = Callback { metadata, exception ->
if (exception != null)
- log.error("ERROR : ${exception.message}")
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
else {
- var logMessage = when (clonedMessage) {
- is ExecutionServiceInput ->
- "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- is ExecutionServiceOutput ->
- "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers"
- }
- log.info(logMessage)
+ val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
+ log.info(message)
}
}
messageTemplate().send(record, callback)
@@ -114,7 +110,7 @@ class KafkaMessageProducerService(
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
" [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
@@ -139,4 +135,18 @@ class KafkaMessageProducerService(
stepData = executionServiceOutput.stepData
}
}
+
+ private fun getMessageLogData(message: Any): String {
+ return when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
+ }
}