aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt126
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt103
4 files changed, 235 insertions, 6 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index bdceec7d3..b2accfb4d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -27,8 +27,8 @@ import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -44,7 +44,7 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index f23624f7a..4fe5f5dd1 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -23,8 +23,8 @@ import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.RecordMetadata
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
@@ -39,7 +39,7 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
new file mode 100644
index 000000000..e2a31f40a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.apache.kafka.streams.state.Stores
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+[
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+
+])
+class KafkaStreamsBasicAuthConsumerServiceTest {
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Test
+ fun testProperties() {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService")
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testKafkaStreamingMessageConsumer() {
+ runBlocking {
+ val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+
+ // Dynamic Consumer Function to create Topology
+ val consumerFunction = object : KafkaStreamConsumerFunction {
+ override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?): Topology {
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ topology.addSource("Source", *topics.toTypedArray())
+ // Processor Supplier
+ val firstProcessorSupplier = object : ProcessorSupplier<ByteArray, ByteArray> {
+ override fun get(): Processor<ByteArray, ByteArray> {
+ return FirstProcessor()
+ }
+ }
+ val changelogConfig: MutableMap<String, String> = hashMapOf()
+ changelogConfig.put("min.insync.replicas", "1")
+
+ // Store Buolder
+ val countStoreSupplier = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("PriorityMessageState"),
+ Serdes.String(),
+ PriorityMessageSerde())
+ .withLoggingEnabled(changelogConfig)
+
+ topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
+ topology.addStateStore(countStoreSupplier, "FirstProcessor")
+ topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
+ PriorityMessageSerde().serializer(), "FirstProcessor")
+ return topology
+ }
+ }
+
+ /** Send message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ launch {
+ repeat(5) {
+ delay(1000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
+ }
+ }
+ streamingConsumerService.consume(null, consumerFunction)
+ delay(10000)
+ streamingConsumerService.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4db9c772e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.state.KeyValueStore
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.io.Serializable
+import java.nio.charset.Charset
+import java.util.*
+
+class PriorityMessage : Serializable {
+ lateinit var id: String
+ lateinit var requestMessage: String
+}
+
+open class PriorityMessageSerde : Serde<PriorityMessage> {
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+
+ override fun deserializer(): Deserializer<PriorityMessage> {
+ return object : Deserializer<PriorityMessage> {
+ override fun deserialize(topic: String, data: ByteArray): PriorityMessage {
+ return JacksonUtils.readValue(String(data), PriorityMessage::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ }
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+ }
+ }
+
+ override fun serializer(): Serializer<PriorityMessage> {
+ return object : Serializer<PriorityMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, data: PriorityMessage): ByteArray {
+ return data.asJsonString().toByteArray(Charset.defaultCharset())
+ }
+
+ override fun close() {
+ }
+ }
+ }
+}
+
+
+class FirstProcessor : Processor<ByteArray, ByteArray> {
+
+ private val log = logger(FirstProcessor::class)
+
+ private lateinit var context: ProcessorContext
+ private lateinit var kvStore: KeyValueStore<String, PriorityMessage>
+
+ override fun process(key: ByteArray, value: ByteArray) {
+ log.info("First Processor key(${String(key)} : value(${String(value)})")
+ val newMessage = PriorityMessage().apply {
+ id = UUID.randomUUID().toString()
+ requestMessage = String(value)
+ }
+ kvStore.put(newMessage.id, newMessage)
+ this.context.forward(newMessage.id, newMessage)
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("init... ${context.keySerde()}, ${context.valueSerde()}")
+ this.context = context
+ this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore<String, PriorityMessage>
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+} \ No newline at end of file