diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules')
4 files changed, 47 insertions, 28 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index 83cc0e022..af689a1f2 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,13 @@ open class KafkaMessageConsumerService( /** execute the command block */ if (!channel.isClosedForSend) { channel.send(consumerRecord) + log.info( + "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " + + "partition(${consumerRecord.partition()}) " + + "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " + + "offset(${consumerRecord.offset()}) " + + "key(${consumerRecord.key()})" + ) } else { log.error("Channel is closed to receive message") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index eccf75301..88b0dfaae 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -1,6 +1,6 @@ /* * Copyright © 2019 IBM. - * Modifications Copyright © 2018-2019 AT&T Intellectual Property. + * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,16 +77,12 @@ class KafkaMessageProducerService( } val callback = Callback { metadata, exception -> if (exception != null) - log.error("ERROR : ${exception.message}") + log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception) else { - var logMessage = when (clonedMessage) { - is ExecutionServiceInput -> - "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" - is ExecutionServiceOutput -> - "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}" - else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers" - } - log.info(logMessage) + val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " + + "partition(${metadata.partition()}) " + + "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}." + log.info(message) } } messageTemplate().send(record, callback) @@ -114,7 +110,7 @@ class KafkaMessageProducerService( /** Truncation of error messages */ var truncErrMsg = executionServiceOutput.status.errorMessage if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) { - truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" + + truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) + " [...]. Check Blueprint Processor logs for more information." } /** Truncation for Command Executor responses */ @@ -139,4 +135,18 @@ class KafkaMessageProducerService( stepData = executionServiceOutput.stepData } } + + private fun getMessageLogData(message: Any): String { + return when (message) { + is ExecutionServiceInput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + is ExecutionServiceOutput -> { + val actionIdentifiers = message.actionIdentifiers + "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})" + } + else -> "message($message)" + } + } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt index 440490a0a..0ab38c6bd 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2021 Bell Canada. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -104,8 +105,15 @@ open class BlueprintProcessingKafkaConsumer( ph.register() val key = message.key() ?: UUID.randomUUID().toString() val value = String(message.value(), Charset.defaultCharset()) - log.trace("Consumed Message : key($key) value($value)") val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() + log.info( + "Consumed Message : topic(${message.topic()}) " + + "partition(${message.partition()}) " + + "leaderEpoch(${message.leaderEpoch().get()}) " + + "offset(${message.offset()}) " + + "key(${message.key()}) " + + "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})" + ) val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 5e7f63cb5..8be43cc94 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -1,5 +1,5 @@ /* - * Copyright © 2020 Bell Canada + * Copyright © 2021 Bell Canada * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,11 +88,8 @@ class KafkaPublishAuditService( try { this.inputInstance = this.getInputInstance(INPUT_SELECTOR) this.inputInstance!!.sendMessage(key, secureExecutionServiceInput) - } catch (e: Exception) { - var errMsg = - if (e.message != null) "ERROR : ${e.message}" - else "ERROR : Failed to send execution request to Kafka." - log.error(errMsg) + } catch (ex: Exception) { + log.error("Failed to publish execution request to Kafka.", ex) } } @@ -109,11 +106,8 @@ class KafkaPublishAuditService( try { this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) this.outputInstance!!.sendMessage(key, executionServiceOutput) - } catch (e: Exception) { - var errMsg = - if (e.message != null) "ERROR : $e" - else "ERROR : Failed to send execution request to Kafka." - log.error(errMsg) + } catch (ex: Exception) { + log.error("Failed to publish execution response to Kafka.", ex) } } @@ -206,12 +200,12 @@ class KafkaPublishAuditService( } } } - } catch (e: Exception) { - val errMsg = "ERROR : Couldn't hide sensitive data in the execution request." - log.error(errMsg, e) + } catch (ex: Exception) { + val errMsg = "Couldn't hide sensitive data in the execution request." + log.error(errMsg, ex) clonedExecutionServiceInput.payload.replace( "$workflowName-request", - "$errMsg $e".asJsonPrimitive() + "$errMsg $ex".asJsonPrimitive() ) } return clonedExecutionServiceInput |