aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt81
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt65
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt7
5 files changed, 104 insertions, 81 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index b2accfb4d..823ba7dee 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -23,7 +23,11 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.*
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.clients.consumer.MockConsumer
+import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
@@ -40,26 +44,30 @@ import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
-
@RunWith(SpringRunner::class)
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
- "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
- "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
- "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
-
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
-])
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ ]
+)
open class BlueprintMessageConsumerServiceTest {
+
val log = logger(BlueprintMessageConsumerServiceTest::class)
@Autowired
@@ -69,7 +77,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerService() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -93,8 +101,10 @@ open class BlueprintMessageConsumerServiceTest {
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
- "I am message $i".toByteArray())
+ val record = ConsumerRecord<String, ByteArray>(
+ topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray()
+ )
mockKafkaConsumer.addRecord(record)
}
@@ -114,7 +124,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerWithDynamicFunction() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -138,16 +148,21 @@ open class BlueprintMessageConsumerServiceTest {
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
- "I am message $i".toByteArray())
+ val record = ConsumerRecord<String, ByteArray>(
+ topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray()
+ )
mockKafkaConsumer.addRecord(record)
}
every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
/** Test Consumer Function implementation */
val consumerFunction = object : KafkaConsumerRecordsFunction {
- override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
- consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+ override suspend fun invoke(
+ messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>
+ ) {
val count = consumerRecords.count()
log.trace("Received Message count($count)")
}
@@ -159,11 +174,11 @@ open class BlueprintMessageConsumerServiceTest {
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testKafkaIntegration() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val channel = blueprintMessageConsumerService.subscribe(null)
@@ -175,18 +190,20 @@ open class BlueprintMessageConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
delay(100)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
- blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = "this is my message($it)",
+ headers = headers
+ )
}
}
delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 4fe5f5dd1..b824189d2 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -35,17 +35,20 @@ import java.util.concurrent.Future
import kotlin.test.Test
import kotlin.test.assertTrue
-
@RunWith(SpringRunner::class)
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
-])
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ ]
+)
open class BlueprintMessageProducerServiceTest {
@Autowired
@@ -55,7 +58,7 @@ open class BlueprintMessageProducerServiceTest {
fun testKafkaBasicAuthProducertService() {
runBlocking {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
@@ -72,8 +75,4 @@ open class BlueprintMessageProducerServiceTest {
assertTrue(response, "failed to get command response")
}
}
-
}
-
-
-
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
index 9cd974622..1657d70b4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
@@ -38,25 +38,29 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.assertNotNull
-
@RunWith(SpringRunner::class)
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-[
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ [
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
- "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
-])
+ ]
+)
class KafkaStreamsBasicAuthConsumerServiceTest {
+
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -67,15 +71,17 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testKafkaStreamingMessageConsumer() {
runBlocking {
val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
// Dynamic Consumer Function to create Topology
val consumerFunction = object : KafkaStreamConsumerFunction {
- override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?): Topology {
+ override suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology {
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
as KafkaStreamsBasicAuthConsumerProperties
@@ -93,29 +99,34 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
// Store Buolder
val countStoreSupplier = Stores.keyValueStoreBuilder(
- Stores.persistentKeyValueStore("PriorityMessageState"),
- Serdes.String(),
- PriorityMessageSerde())
- .withLoggingEnabled(changelogConfig)
+ Stores.persistentKeyValueStore("PriorityMessageState"),
+ Serdes.String(),
+ PriorityMessageSerde()
+ )
+ .withLoggingEnabled(changelogConfig)
topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
topology.addStateStore(countStoreSupplier, "FirstProcessor")
- topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
- PriorityMessageSerde().serializer(), "FirstProcessor")
+ topology.addSink(
+ "SINK", "default-stream-topic-out", Serdes.String().serializer(),
+ PriorityMessageSerde().serializer(), "FirstProcessor"
+ )
return topology
}
}
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
delay(1000)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
- blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = "this is my message($it)",
+ headers = headers
+ )
}
}
streamingConsumerService.consume(null, consumerFunction)
@@ -123,4 +134,4 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
streamingConsumerService.shutDown()
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
index 82e40efd1..3dce3344f 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
@@ -29,7 +29,6 @@ import kotlin.test.assertEquals
class MessageLoggerServiceTest {
-
@Test
fun testMessagingHeaders() {
val messageLoggerService = MessageLoggerService()
@@ -55,7 +54,5 @@ class MessageLoggerServiceTest {
assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID])
messageLoggerService.messageConsumingExisting()
-
}
-
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
index 4db9c772e..5d77c3746 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
@@ -28,7 +28,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import java.io.Serializable
import java.nio.charset.Charset
-import java.util.*
+import java.util.UUID
class PriorityMessage : Serializable {
lateinit var id: String
@@ -47,7 +47,7 @@ open class PriorityMessageSerde : Serde<PriorityMessage> {
return object : Deserializer<PriorityMessage> {
override fun deserialize(topic: String, data: ByteArray): PriorityMessage {
return JacksonUtils.readValue(String(data), PriorityMessage::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ ?: throw BluePrintProcessorException("failed to convert")
}
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
@@ -73,7 +73,6 @@ open class PriorityMessageSerde : Serde<PriorityMessage> {
}
}
-
class FirstProcessor : Processor<ByteArray, ByteArray> {
private val log = logger(FirstProcessor::class)
@@ -100,4 +99,4 @@ class FirstProcessor : Processor<ByteArray, ByteArray> {
override fun close() {
log.info("Close...")
}
-} \ No newline at end of file
+}