aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt54
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml2
2 files changed, 52 insertions, 4 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 86c2ec5ef..bdceec7d3 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.MockConsumer
-import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
@@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+ @Test
+ fun testKafkaBasicAuthConsumerWithDynamicFunction() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+ val topic = "default-topic"
+ val partitions: MutableList<TopicPartition> = arrayListOf()
+ val topicsCollection: MutableList<String> = arrayListOf()
+ partitions.add(TopicPartition(topic, 1))
+ val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+ val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+ val records: Long = 10
+ partitions.forEach { partition ->
+ partitionsBeginningMap[partition] = 0L
+ partitionsEndMap[partition] = records
+ topicsCollection.add(partition.topic())
+ }
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
+ mockKafkaConsumer.subscribe(topicsCollection)
+ mockKafkaConsumer.rebalance(partitions)
+ mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+ mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+ for (i in 1..10) {
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
+ mockKafkaConsumer.addRecord(record)
+ }
+
+ every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+ /** Test Consumer Function implementation */
+ val consumerFunction = object : KafkaConsumerRecordsFunction {
+ override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+ val count = consumerRecords.count()
+ log.trace("Received Message count($count)")
+ }
+ }
+ spyBlueprintMessageConsumerService.consume(consumerFunction)
+ delay(10)
+ spyBlueprintMessageConsumerService.shutDown()
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
//@Test
fun testKafkaIntegration() {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
index 5f4fb4f73..820041f74 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>${testing}</pattern>
+ <pattern>${localPattern}</pattern>
</encoder>
</appender>