aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2021-01-26 14:47:32 -0500
committerJulien Fontaine <julien.fontaine@bell.ca>2021-02-11 14:10:20 -0500
commit136e3e19a1821a694d799e7c8d670407720c9e66 (patch)
treef11360bbc111b779e6b603aff115b5a10e797f4f /ms/blueprintsprocessor
parentc99c15f6f91fb296f2b44c6406795ac0583d632d (diff)
Improve logging for CDS Kafka workers
Modified CDS Kafka consumersand producers logs to provide more details about the topic of the consumer record being consumed or published. Refactored the publish callback to make it more readable. Refactored audit service log error messages. Issue-ID: CCSDK-3154 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: I7b42930e926bc15ce175974a78d3bfe2f219b0a8
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt32
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt24
4 files changed, 47 insertions, 28 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index 83cc0e022..af689a1f2 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,6 +81,13 @@ open class KafkaMessageConsumerService(
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
+ log.info(
+ "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
+ "partition(${consumerRecord.partition()}) " +
+ "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
+ "offset(${consumerRecord.offset()}) " +
+ "key(${consumerRecord.key()})"
+ )
} else {
log.error("Channel is closed to receive message")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index eccf75301..88b0dfaae 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -77,16 +77,12 @@ class KafkaMessageProducerService(
}
val callback = Callback { metadata, exception ->
if (exception != null)
- log.error("ERROR : ${exception.message}")
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
else {
- var logMessage = when (clonedMessage) {
- is ExecutionServiceInput ->
- "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- is ExecutionServiceOutput ->
- "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers"
- }
- log.info(logMessage)
+ val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
+ log.info(message)
}
}
messageTemplate().send(record, callback)
@@ -114,7 +110,7 @@ class KafkaMessageProducerService(
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
" [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
@@ -139,4 +135,18 @@ class KafkaMessageProducerService(
stepData = executionServiceOutput.stepData
}
}
+
+ private fun getMessageLogData(message: Any): String {
+ return when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 440490a0a..0ab38c6bd 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -104,8 +105,15 @@ open class BlueprintProcessingKafkaConsumer(
ph.register()
val key = message.key() ?: UUID.randomUUID().toString()
val value = String(message.value(), Charset.defaultCharset())
- log.trace("Consumed Message : key($key) value($value)")
val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
+ log.info(
+ "Consumed Message : topic(${message.topic()}) " +
+ "partition(${message.partition()}) " +
+ "leaderEpoch(${message.leaderEpoch().get()}) " +
+ "offset(${message.offset()}) " +
+ "key(${message.key()}) " +
+ "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})"
+ )
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 5e7f63cb5..8be43cc94 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -1,5 +1,5 @@
/*
- * Copyright © 2020 Bell Canada
+ * Copyright © 2021 Bell Canada
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -88,11 +88,8 @@ class KafkaPublishAuditService(
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : ${e.message}"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution request to Kafka.", ex)
}
}
@@ -109,11 +106,8 @@ class KafkaPublishAuditService(
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
this.outputInstance!!.sendMessage(key, executionServiceOutput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : $e"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution response to Kafka.", ex)
}
}
@@ -206,12 +200,12 @@ class KafkaPublishAuditService(
}
}
}
- } catch (e: Exception) {
- val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
- log.error(errMsg, e)
+ } catch (ex: Exception) {
+ val errMsg = "Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, ex)
clonedExecutionServiceInput.payload.replace(
"$workflowName-request",
- "$errMsg $e".asJsonPrimitive()
+ "$errMsg $ex".asJsonPrimitive()
)
}
return clonedExecutionServiceInput