diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
5 files changed, 104 insertions, 81 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index b2accfb4d..823ba7dee 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -23,7 +23,11 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.* +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.MockConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith @@ -40,26 +44,30 @@ import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull import kotlin.test.assertTrue - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", - "blueprintsprocessor.messageconsumer.sample.topic=default-topic", - "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", - "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", - "blueprintsprocessor.messageconsumer.sample.pollRecords=1", - - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + ] +) open class BlueprintMessageConsumerServiceTest { + val log = logger(BlueprintMessageConsumerServiceTest::class) @Autowired @@ -69,7 +77,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerService() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -93,8 +101,10 @@ open class BlueprintMessageConsumerServiceTest { mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", - "I am message $i".toByteArray()) + val record = ConsumerRecord<String, ByteArray>( + topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray() + ) mockKafkaConsumer.addRecord(record) } @@ -114,7 +124,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerWithDynamicFunction() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -138,16 +148,21 @@ open class BlueprintMessageConsumerServiceTest { mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", - "I am message $i".toByteArray()) + val record = ConsumerRecord<String, ByteArray>( + topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray() + ) mockKafkaConsumer.addRecord(record) } every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer /** Test Consumer Function implementation */ val consumerFunction = object : KafkaConsumerRecordsFunction { - override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, - consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + override suspend fun invoke( + messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *> + ) { val count = consumerRecords.count() log.trace("Received Message count($count)") } @@ -159,11 +174,11 @@ open class BlueprintMessageConsumerServiceTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testKafkaIntegration() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val channel = blueprintMessageConsumerService.subscribe(null) @@ -175,18 +190,20 @@ open class BlueprintMessageConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { delay(100) val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) } } delay(5000) blueprintMessageConsumerService.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 4fe5f5dd1..b824189d2 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -35,17 +35,20 @@ import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + ] +) open class BlueprintMessageProducerServiceTest { @Autowired @@ -55,7 +58,7 @@ open class BlueprintMessageProducerServiceTest { fun testKafkaBasicAuthProducertService() { runBlocking { val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() @@ -72,8 +75,4 @@ open class BlueprintMessageProducerServiceTest { assertTrue(response, "failed to get command response") } } - } - - - diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt index 9cd974622..1657d70b4 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt @@ -38,25 +38,29 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -[ - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" -]) + ] +) class KafkaStreamsBasicAuthConsumerServiceTest { + @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -67,15 +71,17 @@ class KafkaStreamsBasicAuthConsumerServiceTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testKafkaStreamingMessageConsumer() { runBlocking { val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") // Dynamic Consumer Function to create Topology val consumerFunction = object : KafkaStreamConsumerFunction { - override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>?): Topology { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>? + ): Topology { val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties @@ -93,29 +99,34 @@ class KafkaStreamsBasicAuthConsumerServiceTest { // Store Buolder val countStoreSupplier = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("PriorityMessageState"), - Serdes.String(), - PriorityMessageSerde()) - .withLoggingEnabled(changelogConfig) + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde() + ) + .withLoggingEnabled(changelogConfig) topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") topology.addStateStore(countStoreSupplier, "FirstProcessor") - topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(), - PriorityMessageSerde().serializer(), "FirstProcessor") + topology.addSink( + "SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor" + ) return topology } } /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { delay(1000) val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) } } streamingConsumerService.consume(null, consumerFunction) @@ -123,4 +134,4 @@ class KafkaStreamsBasicAuthConsumerServiceTest { streamingConsumerService.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt index 82e40efd1..3dce3344f 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -29,7 +29,6 @@ import kotlin.test.assertEquals class MessageLoggerServiceTest { - @Test fun testMessagingHeaders() { val messageLoggerService = MessageLoggerService() @@ -55,7 +54,5 @@ class MessageLoggerServiceTest { assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) messageLoggerService.messageConsumingExisting() - } - -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt index 4db9c772e..5d77c3746 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt @@ -28,7 +28,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import java.io.Serializable import java.nio.charset.Charset -import java.util.* +import java.util.UUID class PriorityMessage : Serializable { lateinit var id: String @@ -47,7 +47,7 @@ open class PriorityMessageSerde : Serde<PriorityMessage> { return object : Deserializer<PriorityMessage> { override fun deserialize(topic: String, data: ByteArray): PriorityMessage { return JacksonUtils.readValue(String(data), PriorityMessage::class.java) - ?: throw BluePrintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") } override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { @@ -73,7 +73,6 @@ open class PriorityMessageSerde : Serde<PriorityMessage> { } } - class FirstProcessor : Processor<ByteArray, ByteArray> { private val log = logger(FirstProcessor::class) @@ -100,4 +99,4 @@ class FirstProcessor : Processor<ByteArray, ByteArray> { override fun close() { log.info("Close...") } -}
\ No newline at end of file +} |