diff options
author | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-04 21:44:59 -0400 |
---|---|---|
committer | Brinda Santh <brindasanth@in.ibm.com> | 2019-09-04 21:44:59 -0400 |
commit | 8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (patch) | |
tree | 6e0b91fbad6874a802d557916df7e3d53a776789 /ms/blueprintsprocessor/modules/commons/message-lib/src/test | |
parent | 1a7d92d35a4fdadd162b2eece4a75c04037b9c7e (diff) |
Add Kafka message lib consumer services
Change-Id: Iaf1df07a0d8f4fb54d5d06047520010d3bfe5465
Issue-ID: CCSDK-1668
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
3 files changed, 111 insertions, 11 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt new file mode 100644 index 000000000..18b86b8d8 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.spyk +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.MockConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.common.TopicPartition +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull + + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class]) +@TestPropertySource(properties = +["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092", + "blueprintsprocessor.messageclient.sample.groupId=sample-group", + "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic" +]) +open class BlueprintMessageConsumerServiceTest { + + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testKafkaBasicAuthConsumerService() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList<TopicPartition> = arrayListOf() + val topicsCollection: MutableList<String> = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf() + val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", + "I am message $i") + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + val channel = spyBlueprintMessageConsumerService.subscribe(null) + launch { + channel.consumeEach { + println("Received message : $it") + } + } + //delay(100) + spyBlueprintMessageConsumerService.shutDown() + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 0f8367d7e..0db62c1df 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -41,10 +41,10 @@ import kotlin.test.assertTrue @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, BlueprintPropertyConfiguration::class, BluePrintProperties::class]) @TestPropertySource(properties = -["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092", - "blueprintsprocessor.messageclient.sample.topic=default-topic", - "blueprintsprocessor.messageclient.sample.clientId=default-client-id" +["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" ]) open class BlueprintMessageProducerServiceTest { @@ -52,10 +52,10 @@ open class BlueprintMessageProducerServiceTest { lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @Test - fun testKafkaBasicAuthClientService() { + fun testKafkaBasicAuthProducertService() { runBlocking { - val bluePrintMessageClientService = bluePrintMessageLibPropertyService - .blueprintMessageClientService("sample") as KafkaBasicAuthMessageProducerService + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() @@ -64,11 +64,11 @@ open class BlueprintMessageProducerServiceTest { every { mockKafkaTemplate.send(any(), any()) } returns future - val spyBluePrintMessageClientService = spyk(bluePrintMessageClientService, recordPrivateCalls = true) + val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) - every { spyBluePrintMessageClientService.messageTemplate(any()) } returns mockKafkaTemplate + every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate - val response = spyBluePrintMessageClientService.sendMessage("Testing message") + val response = spyBluePrintMessageProducerService.sendMessage("Testing message") assertTrue(response, "failed to get command response") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 626b8f911..3868440c7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> </encoder> </appender> |