From 8d94f0bd0a9e4d1c57e98df726841dd0e2978569 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Wed, 4 Sep 2019 21:44:59 -0400 Subject: Add Kafka message lib consumer services Change-Id: Iaf1df07a0d8f4fb54d5d06047520010d3bfe5465 Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh --- .../message/BluePrintMessageLibConfiguration.kt | 34 ++++++- .../message/BluePrintMessageLibData.kt | 17 +++- .../service/BluePrintMessageLibPropertyService.kt | 86 +++++++++++++--- .../service/BlueprintMessageConsumerService.kt | 32 ++++++ .../KafkaBasicAuthMessageConsumerService.kt | 113 +++++++++++++++++++++ 5 files changed, 264 insertions(+), 18 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 644c51860..281a970b8 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -17,6 +17,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration @@ -26,10 +31,37 @@ import org.springframework.context.annotation.Configuration @EnableConfigurationProperties open class BluePrintMessageLibConfiguration +/** + * Exposed Dependency Service by this Message Lib Module + */ +fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService = + instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) + +/** Extension functions for message producer service **/ +fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(selector) +} + + +fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { + return messageLibPropertyService().blueprintMessageProducerService(jsonNode) +} + +/** Extension functions for message consumer service **/ +fun BluePrintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(selector) +} + +fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + return messageLibPropertyService().blueprintMessageConsumerService(jsonNode) +} + class MessageLibConstants { companion object { const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service" - const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient." + //TODO("Change to .messageconsumer in application.properties") + const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient." + const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" } } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index e621ec66f..c77cdfdb5 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -16,7 +16,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message - +/** Producer Properties **/ open class MessageProducerProperties @@ -24,4 +24,17 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() lateinit var bootstrapServers: String var topic: String? = null var clientId: String? = null -} \ No newline at end of file +} + +/** Consumer Properties **/ + +open class MessageConsumerProperties + +open class KafkaMessageConsumerProperties : MessageConsumerProperties() { + lateinit var bootstrapServers: String + lateinit var groupId: String + var consumerTopic: String? = null + var pollMillSec: Long = 100 +} + +open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index fb01ce179..7c56ea432 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -18,9 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.* import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @@ -28,22 +26,22 @@ import org.springframework.stereotype.Service @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) { - fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService { - val messageClientProperties = messageClientProperties(jsonNode) - return blueprintMessageClientService(messageClientProperties) + fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { + val messageClientProperties = messageProducerProperties(jsonNode) + return blueprintMessageProducerService(messageClientProperties) } - fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService { - val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector" - val messageClientProperties = messageClientProperties(prefix) - return blueprintMessageClientService(messageClientProperties) + fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector" + val messageClientProperties = messageProducerProperties(prefix) + return blueprintMessageProducerService(messageClientProperties) } - fun messageClientProperties(prefix: String): MessageProducerProperties { + fun messageProducerProperties(prefix: String): MessageProducerProperties { val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java) return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { - kafkaBasicAuthMessageClientProperties(prefix) + kafkaBasicAuthMessageProducerProperties(prefix) } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") @@ -51,7 +49,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties { + fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties { val type = jsonNode.get("type").textValue() return when (type) { MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { @@ -63,7 +61,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties) + private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties) : BlueprintMessageProducerService { when (MessageProducerProperties) { @@ -76,9 +74,67 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B } } - private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { + private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { return bluePrintProperties.propertyBeanType( prefix, KafkaBasicAuthMessageProducerProperties::class.java) } + /** Consumer Property Lib Service Implementation **/ + + /** Return Message Consumer Service for [jsonNode] definitions. */ + fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService { + val messageConsumerProperties = messageConsumerProperties(jsonNode) + return blueprintMessageConsumerService(messageConsumerProperties) + } + + /** Return Message Consumer Service for [selector] definitions. */ + fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService { + val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector" + val messageClientProperties = messageConsumerProperties(prefix) + return blueprintMessageConsumerService(messageClientProperties) + } + + /** Return Message Consumer Properties for [prefix] definitions. */ + fun messageConsumerProperties(prefix: String): MessageConsumerProperties { + val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java) + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + kafkaBasicAuthMessageConsumerProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties { + val type = jsonNode.get("type").textValue() + return when (type) { + MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Message adaptor($type) is not supported") + } + } + } + + private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties) + : BlueprintMessageConsumerService { + + when (messageConsumerProperties) { + is KafkaBasicAuthMessageConsumerProperties -> { + return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) + } + else -> { + throw BluePrintProcessorException("couldn't get Message client service for") + } + } + } + + private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { + return bluePrintProperties.propertyBeanType( + prefix, KafkaBasicAuthMessageConsumerProperties::class.java) + } + } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt new file mode 100644 index 000000000..25f0bf44d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel + +interface BlueprintMessageConsumerService { + + /** Subscribe to the Kafka channel with [additionalConfig] */ + suspend fun subscribe(additionalConfig: Map?): Channel + + /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ + suspend fun subscribe(topics: List, additionalConfig: Map? = null): Channel + + /** close the channel, consumer and other resources */ + suspend fun shutDown() + +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt new file mode 100644 index 000000000..076501eab --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -0,0 +1,113 @@ +/* + * Copyright © 2019 IBM. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.time.Duration +import kotlin.concurrent.thread + +class KafkaBasicAuthMessageConsumerService( + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) + : BlueprintMessageConsumerService { + + private val channel = Channel() + private var kafkaConsumer: Consumer? = null + val log = logger(KafkaBasicAuthMessageConsumerService::class) + + @Volatile + var keepGoing = true + + fun kafkaConsumer(additionalConfig: Map? = null): Consumer { + val configProperties = hashMapOf() + configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers + configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return KafkaConsumer(configProperties) + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return subscribe(consumerTopic, additionalConfig) + } + + + override suspend fun subscribe(consumerTopic: List, additionalConfig: Map?): Channel { + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(consumerTopic) + log.info("Successfully consumed topic($consumerTopic)") + + val listenerThread = thread(start = true, name = "KafkaConsumer") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + runBlocking { + consumerRecords?.forEach { consumerRecord -> + /** execute the command block */ + consumerRecord.value()?.let { + launch { + if (!channel.isClosedForSend) { + channel.send(it) + } else { + log.error("Channel is closed to receive message") + } + } + } + } + } + } + } + + } + log.info("Successfully consumed in thread(${listenerThread})") + return channel + } + + override suspend fun shutDown() { + /** Close the Channel */ + channel.close() + /** stop the polling loop */ + keepGoing = false + if (kafkaConsumer != null) { + /** sunsubscribe the consumer */ + kafkaConsumer!!.unsubscribe() + } + } +} -- cgit 1.2.3-korg