From 8a26b031f8980b2a93b51e5327df959c6926c034 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 29 Oct 2019 10:47:00 -0400 Subject: Add message consumer dynamic functions. Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh Change-Id: I51be88598557a05ef5db7cd595689a8e4a653fdc --- .../service/BlueprintMessageConsumerServiceTest.kt | 54 ++++++++++++++++++++-- .../src/test/resources/logback-test.xml | 2 +- 2 files changed, 52 insertions(+), 4 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 86c2ec5ef..bdceec7d3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.MockConsumer -import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext @@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaBasicAuthConsumerWithDynamicFunction() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList = arrayListOf() + val topicsCollection: MutableList = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap = mutableMapOf() + val partitionsEndMap: MutableMap = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + /** Test Consumer Function implementation */ + val consumerFunction = object : KafkaConsumerRecordsFunction { + override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + val count = consumerRecords.count() + log.trace("Received Message count($count)") + } + } + spyBlueprintMessageConsumerService.consume(consumerFunction) + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ //@Test fun testKafkaIntegration() { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 5f4fb4f73..820041f74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ - ${testing} + ${localPattern} -- cgit 1.2.3-korg