diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
4 files changed, 54 insertions, 8 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index ac35fbf2c..b07d64388 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -78,7 +78,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() @@ -142,7 +142,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() @@ -218,7 +218,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer var keystore: String? = null var keystorePassword: String? = null var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE - var sslEndpointIdentificationAlgorithm: String = "" + var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 931f052ed..e4991d2d8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -17,12 +17,16 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import com.fasterxml.jackson.databind.node.ObjectNode import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory @@ -39,6 +43,10 @@ class KafkaMessageProducerService( private val messageLoggerService = MessageLoggerService() + companion object { + const val MAX_ERR_MSG_LEN = 128 + } + override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } return sendMessageNB(messageProducerProperties.topic!!, message) @@ -54,9 +62,14 @@ class KafkaMessageProducerService( message: Any, headers: MutableMap<String, String>? ): Boolean { - val byteArrayMessage = when (message) { - is String -> message.toByteArray(Charset.defaultCharset()) - else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + var clonedMessage = message + if (clonedMessage is ExecutionServiceOutput) { + clonedMessage = truncateResponse(clonedMessage) + } + + val byteArrayMessage = when (clonedMessage) { + is String -> clonedMessage.toByteArray(Charset.defaultCharset()) + else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset()) } val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) @@ -85,4 +98,37 @@ class KafkaMessageProducerService( return kafkaProducer!! } + + /** + * Truncation of BP responses + */ + private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput { + /** Truncation of error messages */ + var truncErrMsg = executionServiceOutput.status.errorMessage + if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) { + truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" + + " [...]. Check Blueprint Processor logs for more information." + } + /** Truncation for Command Executor responses */ + var truncPayload = executionServiceOutput.payload.deepCopy() + val workflowName = executionServiceOutput.actionIdentifiers.actionName + if (truncPayload.path("$workflowName-response").has("execute-command-logs")) { + var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode + cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive()) + } + return ExecutionServiceOutput().apply { + correlationUUID = executionServiceOutput.correlationUUID + commonHeader = executionServiceOutput.commonHeader + actionIdentifiers = executionServiceOutput.actionIdentifiers + status = Status().apply { + code = executionServiceOutput.status.code + eventType = executionServiceOutput.status.eventType + timestamp = executionServiceOutput.status.timestamp + errorMessage = truncErrMsg + message = executionServiceOutput.status.message + } + payload = truncPayload + stepData = executionServiceOutput.stepData + } + } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index ac08dc7b7..fdf6e48e7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -214,7 +214,7 @@ open class BlueprintMessageConsumerServiceTest { SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + "username=\"sample-user\" " + diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 72a47ed56..da7394998 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -109,7 +109,7 @@ open class BlueprintMessageProducerServiceTest { SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks", SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS", SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword", - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "", + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512", SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " + "username=\"sample-user\" " + |