From e02f72bcdafdb85fc93a3b21668569d59613ca07 Mon Sep 17 00:00:00 2001 From: prathameshmo Date: Mon, 22 Jul 2019 13:48:51 -0400 Subject: Fixed Kafka consumer behaviour on failed deserialization -Added ErrorHandlingDeserializer -Updated the integration test. Issue-ID: CCSDK-1514 Change-Id: I69112df850dfae2d4a3bd967b1dcfa541ea1523a Signed-off-by: prathameshmo --- .../selfservice/api/MessagingConfig.kt | 21 ++++++++++++++++----- .../selfservice/api/MessagingController.kt | 11 +++++------ .../api/messaginglib/MessagingControllerTest.kt | 10 ++++++---- .../test/resources/cba-for-kafka-integration.zip | Bin 20700 -> 0 bytes .../cba-for-kafka-integration_enriched.zip | Bin 0 -> 9781 bytes 5 files changed, 27 insertions(+), 15 deletions(-) delete mode 100644 ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip create mode 100755 ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt index a04a79921..17e157d15 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt @@ -1,5 +1,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer @@ -7,10 +9,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 import org.springframework.kafka.support.serializer.JsonDeserializer @Configuration @@ -26,11 +28,20 @@ open class MessagingConfig { val configProperties = hashMapOf() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name - configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java + configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name - return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java)) + val deserializer = JsonDeserializer() + deserializer.setRemoveTypeHeaders(true) + deserializer.addTrustedPackages("*") + + val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java, + ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)) + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), + ErrorHandlingDeserializer2(jsonDeserializer)) } /** diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt index 1d219a83e..54cc0c129 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import org.apache.commons.lang3.builder.ToStringBuilder +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.slf4j.LoggerFactory @@ -39,17 +40,15 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP } @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) - open fun receive(input: ExecutionServiceInput) { - - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + open fun receive(record: ConsumerRecord) { runBlocking { - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value())) // Process the message. async { - processMessage(input) - } + processMessage(record.value()) + }.await() } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt index f7459f522..602033ad9 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt @@ -90,7 +90,7 @@ class MessagingControllerTest { @Autowired lateinit var webTestClient: WebTestClient - var receivedEvent: String? = null + var event: ExecutionServiceInput? = null @Before fun setup() { @@ -142,11 +142,13 @@ class MessagingControllerTest { log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input)) Thread.sleep(1000) + + assertNotNull(event) } @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])]) - fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) { - assertNotNull(event) + fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) { + event = receivedEvent } private fun uploadBluePrint() { @@ -172,7 +174,7 @@ class MessagingControllerTest { } private fun loadCbaArchive():File { - return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile() + return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile() } @Configuration diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip deleted file mode 100644 index 23070380c..000000000 Binary files a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip and /dev/null differ diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip new file mode 100755 index 000000000..9581191d7 Binary files /dev/null and b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip differ -- cgit 1.2.3-korg