diff options
author | Brinda Santh Muthuramalingam <brindasanth@in.ibm.com> | 2019-07-23 19:59:49 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-07-23 19:59:49 +0000 |
commit | d9e06dc28322405298fd823ddd6f737b05d79502 (patch) | |
tree | 4e0e17a2243710f17783e9ab8f65be08d82f1827 /ms/blueprintsprocessor/modules | |
parent | 4b745783a12ef5e03d90488c1db5673baadf47f8 (diff) | |
parent | e02f72bcdafdb85fc93a3b21668569d59613ca07 (diff) |
Merge "Fixed Kafka consumer behaviour on failed deserialization"
Diffstat (limited to 'ms/blueprintsprocessor/modules')
5 files changed, 27 insertions, 15 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt index a04a79921..17e157d15 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt @@ -1,5 +1,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer @@ -7,10 +9,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.annotation.EnableKafka import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 import org.springframework.kafka.support.serializer.JsonDeserializer @Configuration @@ -26,11 +28,20 @@ open class MessagingConfig { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId - configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name - configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java + configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name - return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java)) + val deserializer = JsonDeserializer<ExecutionServiceInput>() + deserializer.setRemoveTypeHeaders(true) + deserializer.addTrustedPackages("*") + + val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java, + ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)) + + return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), + ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer)) } /** diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt index 1d219a83e..54cc0c129 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt @@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import org.apache.commons.lang3.builder.ToStringBuilder +import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.slf4j.LoggerFactory @@ -39,17 +40,15 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP } @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"]) - open fun receive(input: ExecutionServiceInput) { - - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) { runBlocking { - log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input)) + log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value())) // Process the message. async { - processMessage(input) - } + processMessage(record.value()) + }.await() } } diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt index f7459f522..602033ad9 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt @@ -90,7 +90,7 @@ class MessagingControllerTest { @Autowired lateinit var webTestClient: WebTestClient - var receivedEvent: String? = null + var event: ExecutionServiceInput? = null @Before fun setup() { @@ -142,11 +142,13 @@ class MessagingControllerTest { log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input)) Thread.sleep(1000) + + assertNotNull(event) } @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])]) - fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) { - assertNotNull(event) + fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) { + event = receivedEvent } private fun uploadBluePrint() { @@ -172,7 +174,7 @@ class MessagingControllerTest { } private fun loadCbaArchive():File { - return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile() + return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile() } @Configuration diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip Binary files differdeleted file mode 100644 index 23070380c..000000000 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip +++ /dev/null diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip Binary files differnew file mode 100755 index 000000000..9581191d7 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip |