From a0407eb91a2424f847e188796328871b3a339c93 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Fri, 8 Nov 2019 16:41:07 -0500 Subject: Add Kafka Streams consumer service Issue-ID: CCSDK-1914 Signed-off-by: Brinda Santh Change-Id: I8d2b51c66e1304decadbb55656fe8a0b4c018feb --- .../message/BluePrintMessageLibConfiguration.kt | 2 + .../message/BluePrintMessageLibData.kt | 17 ++++++ .../service/BluePrintMessageLibPropertyService.kt | 15 +++++ .../service/BlueprintMessageConsumerService.kt | 6 ++ .../KafkaBasicAuthMessageProducerService.kt | 6 +- .../KafkaStreamsBasicAuthConsumerService.kt | 70 ++++++++++++++++++++++ 6 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index 27a444bdc..ecffa280f 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,5 +63,6 @@ class MessageLibConstants { const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer." const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer." const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" + const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" } } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 184e85b70..d0c3d5ae1 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -17,6 +17,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message +import org.apache.kafka.streams.StreamsConfig + /** Producer Properties **/ open class MessageProducerProperties @@ -25,12 +27,27 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() lateinit var bootstrapServers: String var topic: String? = null var clientId: String? = null + // strongest producing guarantee + var acks: String = "all" + var retries: Int = 0 + // ensure we don't push duplicates + var enableIdempotence: Boolean = true } /** Consumer Properties **/ open class MessageConsumerProperties +open class KafkaStreamsConsumerProperties : MessageConsumerProperties() { + lateinit var bootstrapServers: String + lateinit var applicationId: String + lateinit var topic: String + var autoOffsetReset: String = "latest" + var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE +} + +open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties() + open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 7c56ea432..563da9435 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +102,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { kafkaBasicAuthMessageConsumerProperties(prefix) } + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + kafkaStreamsBasicAuthMessageConsumerProperties(prefix) + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -113,6 +117,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!! } + MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> { + JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!! + } else -> { throw BluePrintProcessorException("Message adaptor($type) is not supported") } @@ -126,6 +133,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B is KafkaBasicAuthMessageConsumerProperties -> { return KafkaBasicAuthMessageConsumerService(messageConsumerProperties) } + is KafkaStreamsBasicAuthConsumerProperties -> { + return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties) + } else -> { throw BluePrintProcessorException("couldn't get Message client service for") } @@ -137,4 +147,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B prefix, KafkaBasicAuthMessageConsumerProperties::class.java) } + private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { + return bluePrintProperties.propertyBeanType( + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java) + } + } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 8bcc7580a..716fda609 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -20,6 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException @@ -61,4 +62,9 @@ interface BlueprintMessageConsumerService { interface KafkaConsumerRecordsFunction : ConsumerFunction { suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) +} + +interface KafkaStreamConsumerFunction : ConsumerFunction { + suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map?): Topology } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 42adcd712..ad9a594b0 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -65,9 +65,9 @@ class KafkaBasicAuthMessageProducerService( headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> - log.info("message published offset(${metadata.offset()}, headers :$headers )") + log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers") } - messageTemplate().send(record, callback).get() + messageTemplate().send(record, callback) return true } @@ -77,6 +77,8 @@ class KafkaBasicAuthMessageProducerService( configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java + configProps[ACKS_CONFIG] = messageProducerProperties.acks + configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt new file mode 100644 index 000000000..229e462da --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -0,0 +1,70 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.util.* + +open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) + : BlueprintMessageConsumerService { + + val log = logger(KafkaStreamsBasicAuthConsumerService::class) + lateinit var kafkaStreams: KafkaStreams + + private fun streamsConfig(additionalConfig: Map? = null): Properties { + val configProperties = Properties() + configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId + configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset + configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee + // TODO("Security Implementation based on type") + /** add or override already set properties */ + additionalConfig?.let { configProperties.putAll(it) } + /** Create Kafka consumer */ + return configProperties + } + + override suspend fun subscribe(additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { + throw BluePrintProcessorException("not implemented") + } + + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + val streamsConfig = streamsConfig(additionalConfig) + val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction + val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + kafkaStreams = KafkaStreams(topology, streamsConfig) + kafkaStreams.cleanUp() + kafkaStreams.start() + kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") } + } + + override suspend fun shutDown() { + if (kafkaStreams != null) { + kafkaStreams.close() + } + } +} \ No newline at end of file -- cgit 1.2.3-korg