diff options
author | Brinda Santh <bs2796@att.com> | 2019-11-08 16:41:07 -0500 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2019-11-08 17:07:32 -0500 |
commit | a0407eb91a2424f847e188796328871b3a339c93 (patch) | |
tree | a54ed7c844f7be84f8a4b9da30c31406df0053ea /ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin | |
parent | 54eb2f2680d4f2447a4d48b612b2b83e37d90754 (diff) |
Add Kafka Streams consumer service
Issue-ID: CCSDK-1914
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I8d2b51c66e1304decadbb55656fe8a0b4c018feb
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin')
2 files changed, 229 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt new file mode 100644 index 000000000..e2a31f40a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt @@ -0,0 +1,126 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.Topology +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorSupplier +import org.apache.kafka.streams.state.Stores +import org.junit.Test +import org.junit.runner.RunWith +import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties +import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit4.SpringRunner +import kotlin.test.assertNotNull + + +@RunWith(SpringRunner::class) +@DirtiesContext +@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, + BlueprintPropertyConfiguration::class, BluePrintProperties::class]) +@TestPropertySource(properties = +[ + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", + + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" + +]) +class KafkaStreamsBasicAuthConsumerServiceTest { + @Autowired + lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService + + @Test + fun testProperties() { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService") + } + + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ + //@Test + fun testKafkaStreamingMessageConsumer() { + runBlocking { + val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") + + // Dynamic Consumer Function to create Topology + val consumerFunction = object : KafkaStreamConsumerFunction { + override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>?): Topology { + val topology = Topology() + val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties + as KafkaStreamsBasicAuthConsumerProperties + + val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",") + topology.addSource("Source", *topics.toTypedArray()) + // Processor Supplier + val firstProcessorSupplier = object : ProcessorSupplier<ByteArray, ByteArray> { + override fun get(): Processor<ByteArray, ByteArray> { + return FirstProcessor() + } + } + val changelogConfig: MutableMap<String, String> = hashMapOf() + changelogConfig.put("min.insync.replicas", "1") + + // Store Buolder + val countStoreSupplier = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde()) + .withLoggingEnabled(changelogConfig) + + topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") + topology.addStateStore(countStoreSupplier, "FirstProcessor") + topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor") + return topology + } + } + + /** Send message with every 1 sec */ + val blueprintMessageProducerService = bluePrintMessageLibPropertyService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + launch { + repeat(5) { + delay(1000) + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) + } + } + streamingConsumerService.consume(null, consumerFunction) + delay(10000) + streamingConsumerService.shutDown() + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt new file mode 100644 index 000000000..4db9c772e --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt @@ -0,0 +1,103 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.common.serialization.Serializer +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.state.KeyValueStore +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import java.io.Serializable +import java.nio.charset.Charset +import java.util.* + +class PriorityMessage : Serializable { + lateinit var id: String + lateinit var requestMessage: String +} + +open class PriorityMessageSerde : Serde<PriorityMessage> { + + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun close() { + } + + override fun deserializer(): Deserializer<PriorityMessage> { + return object : Deserializer<PriorityMessage> { + override fun deserialize(topic: String, data: ByteArray): PriorityMessage { + return JacksonUtils.readValue(String(data), PriorityMessage::class.java) + ?: throw BluePrintProcessorException("failed to convert") + } + + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun close() { + } + } + } + + override fun serializer(): Serializer<PriorityMessage> { + return object : Serializer<PriorityMessage> { + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun serialize(topic: String?, data: PriorityMessage): ByteArray { + return data.asJsonString().toByteArray(Charset.defaultCharset()) + } + + override fun close() { + } + } + } +} + + +class FirstProcessor : Processor<ByteArray, ByteArray> { + + private val log = logger(FirstProcessor::class) + + private lateinit var context: ProcessorContext + private lateinit var kvStore: KeyValueStore<String, PriorityMessage> + + override fun process(key: ByteArray, value: ByteArray) { + log.info("First Processor key(${String(key)} : value(${String(value)})") + val newMessage = PriorityMessage().apply { + id = UUID.randomUUID().toString() + requestMessage = String(value) + } + kvStore.put(newMessage.id, newMessage) + this.context.forward(newMessage.id, newMessage) + } + + override fun init(context: ProcessorContext) { + log.info("init... ${context.keySerde()}, ${context.valueSerde()}") + this.context = context + this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore<String, PriorityMessage> + } + + override fun close() { + log.info("Close...") + } +}
\ No newline at end of file |