summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
diff options
context:
space:
mode:
authorprathameshmo <prathamesh.morde@bell.ca>2019-07-22 13:48:51 -0400
committerprathameshmo <prathamesh_morde@yahoo.ca>2019-07-23 09:56:27 -0400
commite02f72bcdafdb85fc93a3b21668569d59613ca07 (patch)
treeba24dbdb7a65dce3c68ca7da0eccdf4880ff8730 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main
parent6434d38dc045a10f17beec4d015348580ac09ddf (diff)
Fixed Kafka consumer behaviour on failed deserialization
-Added ErrorHandlingDeserializer -Updated the integration test. Issue-ID: CCSDK-1514 Change-Id: I69112df850dfae2d4a3bd967b1dcfa541ea1523a Signed-off-by: prathameshmo <prathamesh_morde@yahoo.ca>
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main')
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt21
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt11
2 files changed, 21 insertions, 11 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
index a04a79921..17e157d15 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
@@ -1,5 +1,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import com.fasterxml.jackson.databind.DeserializationFeature
+import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
@@ -7,10 +9,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
import org.springframework.kafka.support.serializer.JsonDeserializer
@Configuration
@@ -26,11 +28,20 @@ open class MessagingConfig {
val configProperties = hashMapOf<String, Any>()
configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
- configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
+ configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
- return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+ val deserializer = JsonDeserializer<ExecutionServiceInput>()
+ deserializer.setRemoveTypeHeaders(true)
+ deserializer.addTrustedPackages("*")
+
+ val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java,
+ ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
+
+ return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
+ ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
}
/**
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
index 1d219a83e..54cc0c129 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.apache.commons.lang3.builder.ToStringBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.slf4j.LoggerFactory
@@ -39,17 +40,15 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP
}
@KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
- open fun receive(input: ExecutionServiceInput) {
-
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+ open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
runBlocking {
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
// Process the message.
async {
- processMessage(input)
- }
+ processMessage(record.value())
+ }.await()
}
}