aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt32
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt24
4 files changed, 47 insertions, 28 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index 83cc0e022..af689a1f2 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,6 +81,13 @@ open class KafkaMessageConsumerService(
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
+ log.info(
+ "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
+ "partition(${consumerRecord.partition()}) " +
+ "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
+ "offset(${consumerRecord.offset()}) " +
+ "key(${consumerRecord.key()})"
+ )
} else {
log.error("Channel is closed to receive message")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index eccf75301..88b0dfaae 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -1,6 +1,6 @@
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -77,16 +77,12 @@ class KafkaMessageProducerService(
}
val callback = Callback { metadata, exception ->
if (exception != null)
- log.error("ERROR : ${exception.message}")
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
else {
- var logMessage = when (clonedMessage) {
- is ExecutionServiceInput ->
- "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- is ExecutionServiceOutput ->
- "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers"
- }
- log.info(logMessage)
+ val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
+ log.info(message)
}
}
messageTemplate().send(record, callback)
@@ -114,7 +110,7 @@ class KafkaMessageProducerService(
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
" [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
@@ -139,4 +135,18 @@ class KafkaMessageProducerService(
stepData = executionServiceOutput.stepData
}
}
+
+ private fun getMessageLogData(message: Any): String {
+ return when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 440490a0a..0ab38c6bd 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -104,8 +105,15 @@ open class BlueprintProcessingKafkaConsumer(
ph.register()
val key = message.key() ?: UUID.randomUUID().toString()
val value = String(message.value(), Charset.defaultCharset())
- log.trace("Consumed Message : key($key) value($value)")
val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
+ log.info(
+ "Consumed Message : topic(${message.topic()}) " +
+ "partition(${message.partition()}) " +
+ "leaderEpoch(${message.leaderEpoch().get()}) " +
+ "offset(${message.offset()}) " +
+ "key(${message.key()}) " +
+ "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})"
+ )
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 5e7f63cb5..8be43cc94 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -1,5 +1,5 @@
/*
- * Copyright © 2020 Bell Canada
+ * Copyright © 2021 Bell Canada
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -88,11 +88,8 @@ class KafkaPublishAuditService(
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : ${e.message}"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution request to Kafka.", ex)
}
}
@@ -109,11 +106,8 @@ class KafkaPublishAuditService(
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
this.outputInstance!!.sendMessage(key, executionServiceOutput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : $e"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution response to Kafka.", ex)
}
}
@@ -206,12 +200,12 @@ class KafkaPublishAuditService(
}
}
}
- } catch (e: Exception) {
- val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
- log.error(errMsg, e)
+ } catch (ex: Exception) {
+ val errMsg = "Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, ex)
clonedExecutionServiceInput.payload.replace(
"$workflowName-request",
- "$errMsg $e".asJsonPrimitive()
+ "$errMsg $ex".asJsonPrimitive()
)
}
return clonedExecutionServiceInput