aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-11-08 16:41:07 -0500
committerBrinda Santh <bs2796@att.com>2019-11-08 17:07:32 -0500
commita0407eb91a2424f847e188796328871b3a339c93 (patch)
treea54ed7c844f7be84f8a4b9da30c31406df0053ea /ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin
parent54eb2f2680d4f2447a4d48b612b2b83e37d90754 (diff)
Add Kafka Streams consumer service
Issue-ID: CCSDK-1914 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I8d2b51c66e1304decadbb55656fe8a0b4c018feb
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt126
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt103
2 files changed, 229 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
new file mode 100644
index 000000000..e2a31f40a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.apache.kafka.streams.state.Stores
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+[
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+
+])
+class KafkaStreamsBasicAuthConsumerServiceTest {
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Test
+ fun testProperties() {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService")
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testKafkaStreamingMessageConsumer() {
+ runBlocking {
+ val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+
+ // Dynamic Consumer Function to create Topology
+ val consumerFunction = object : KafkaStreamConsumerFunction {
+ override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?): Topology {
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ topology.addSource("Source", *topics.toTypedArray())
+ // Processor Supplier
+ val firstProcessorSupplier = object : ProcessorSupplier<ByteArray, ByteArray> {
+ override fun get(): Processor<ByteArray, ByteArray> {
+ return FirstProcessor()
+ }
+ }
+ val changelogConfig: MutableMap<String, String> = hashMapOf()
+ changelogConfig.put("min.insync.replicas", "1")
+
+ // Store Buolder
+ val countStoreSupplier = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("PriorityMessageState"),
+ Serdes.String(),
+ PriorityMessageSerde())
+ .withLoggingEnabled(changelogConfig)
+
+ topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
+ topology.addStateStore(countStoreSupplier, "FirstProcessor")
+ topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
+ PriorityMessageSerde().serializer(), "FirstProcessor")
+ return topology
+ }
+ }
+
+ /** Send message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ launch {
+ repeat(5) {
+ delay(1000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
+ }
+ }
+ streamingConsumerService.consume(null, consumerFunction)
+ delay(10000)
+ streamingConsumerService.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4db9c772e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.state.KeyValueStore
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.io.Serializable
+import java.nio.charset.Charset
+import java.util.*
+
+class PriorityMessage : Serializable {
+ lateinit var id: String
+ lateinit var requestMessage: String
+}
+
+open class PriorityMessageSerde : Serde<PriorityMessage> {
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+
+ override fun deserializer(): Deserializer<PriorityMessage> {
+ return object : Deserializer<PriorityMessage> {
+ override fun deserialize(topic: String, data: ByteArray): PriorityMessage {
+ return JacksonUtils.readValue(String(data), PriorityMessage::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ }
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+ }
+ }
+
+ override fun serializer(): Serializer<PriorityMessage> {
+ return object : Serializer<PriorityMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, data: PriorityMessage): ByteArray {
+ return data.asJsonString().toByteArray(Charset.defaultCharset())
+ }
+
+ override fun close() {
+ }
+ }
+ }
+}
+
+
+class FirstProcessor : Processor<ByteArray, ByteArray> {
+
+ private val log = logger(FirstProcessor::class)
+
+ private lateinit var context: ProcessorContext
+ private lateinit var kvStore: KeyValueStore<String, PriorityMessage>
+
+ override fun process(key: ByteArray, value: ByteArray) {
+ log.info("First Processor key(${String(key)} : value(${String(value)})")
+ val newMessage = PriorityMessage().apply {
+ id = UUID.randomUUID().toString()
+ requestMessage = String(value)
+ }
+ kvStore.put(newMessage.id, newMessage)
+ this.context.forward(newMessage.id, newMessage)
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("init... ${context.keySerde()}, ${context.valueSerde()}")
+ this.context = context
+ this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore<String, PriorityMessage>
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+} \ No newline at end of file