aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBrinda Santh Muthuramalingam <brindasanth@in.ibm.com>2019-07-23 19:59:49 +0000
committerGerrit Code Review <gerrit@onap.org>2019-07-23 19:59:49 +0000
commitd9e06dc28322405298fd823ddd6f737b05d79502 (patch)
tree4e0e17a2243710f17783e9ab8f65be08d82f1827
parent4b745783a12ef5e03d90488c1db5673baadf47f8 (diff)
parente02f72bcdafdb85fc93a3b21668569d59613ca07 (diff)
Merge "Fixed Kafka consumer behaviour on failed deserialization"
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt21
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt11
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zipbin20700 -> 0 bytes
-rwxr-xr-xms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zipbin0 -> 9781 bytes
5 files changed, 27 insertions, 15 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
index a04a79921..17e157d15 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
@@ -1,5 +1,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+import com.fasterxml.jackson.databind.DeserializationFeature
+import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
@@ -7,10 +9,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
import org.springframework.kafka.support.serializer.JsonDeserializer
@Configuration
@@ -26,11 +28,20 @@ open class MessagingConfig {
val configProperties = hashMapOf<String, Any>()
configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
- configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
+ configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
- return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+ val deserializer = JsonDeserializer<ExecutionServiceInput>()
+ deserializer.setRemoveTypeHeaders(true)
+ deserializer.addTrustedPackages("*")
+
+ val jsonDeserializer = JsonDeserializer(ExecutionServiceInput::class.java,
+ ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
+
+ return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
+ ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
}
/**
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
index 1d219a83e..54cc0c129 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.apache.commons.lang3.builder.ToStringBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.slf4j.LoggerFactory
@@ -39,17 +40,15 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP
}
@KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
- open fun receive(input: ExecutionServiceInput) {
-
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+ open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
runBlocking {
- log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+ log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
// Process the message.
async {
- processMessage(input)
- }
+ processMessage(record.value())
+ }.await()
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
index f7459f522..602033ad9 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
@@ -90,7 +90,7 @@ class MessagingControllerTest {
@Autowired
lateinit var webTestClient: WebTestClient
- var receivedEvent: String? = null
+ var event: ExecutionServiceInput? = null
@Before
fun setup() {
@@ -142,11 +142,13 @@ class MessagingControllerTest {
log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
Thread.sleep(1000)
+
+ assertNotNull(event)
}
@KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
- fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) {
- assertNotNull(event)
+ fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) {
+ event = receivedEvent
}
private fun uploadBluePrint() {
@@ -172,7 +174,7 @@ class MessagingControllerTest {
}
private fun loadCbaArchive():File {
- return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile()
+ return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
}
@Configuration
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
deleted file mode 100644
index 23070380c..000000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
+++ /dev/null
Binary files differ
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip
new file mode 100755
index 000000000..9581191d7
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration_enriched.zip
Binary files differ