From 8a26b031f8980b2a93b51e5327df959c6926c034 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Tue, 29 Oct 2019 10:47:00 -0400 Subject: Add message consumer dynamic functions. Issue-ID: CCSDK-1668 Signed-off-by: Brinda Santh Change-Id: I51be88598557a05ef5db7cd595689a8e4a653fdc --- .../message/BluePrintMessageLibData.kt | 3 +- .../service/BlueprintMessageConsumerService.kt | 34 ++++++++++++- .../KafkaBasicAuthMessageConsumerService.kt | 56 ++++++++++++++++++---- 3 files changed, 82 insertions(+), 11 deletions(-) (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 0b899f2a3..184e85b70 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +34,7 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var clientId: String? = null + lateinit var clientId: String var topic: String? = null var autoCommit: Boolean = true var autoOffsetReset: String = "latest" diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 25f0bf44d..8bcc7580a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,47 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +/** Consumer Function Interfaces */ +interface ConsumerFunction interface BlueprintMessageConsumerService { + suspend fun subscribe(): Channel { + return subscribe(null) + } + /** Subscribe to the Kafka channel with [additionalConfig] */ suspend fun subscribe(additionalConfig: Map?): Channel /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ suspend fun subscribe(topics: List, additionalConfig: Map? = null): Channel + /** Consume and execute dynamic function [consumerFunction] */ + suspend fun consume(consumerFunction: ConsumerFunction) { + consume(null, consumerFunction) + } + + /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + + /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(topics: List, additionalConfig: Map?, + consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + /** close the channel, consumer and other resources */ suspend fun shutDown() - +} +/** Consumer dynamic implementation interface */ +interface KafkaConsumerRecordsFunction : ConsumerFunction { + suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *>) } \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index a4ccfa901..757846c81 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -38,7 +38,6 @@ open class KafkaBasicAuthMessageConsumerService( : BlueprintMessageConsumerService { val log = logger(KafkaBasicAuthMessageConsumerService::class) - val channel = Channel() var kafkaConsumer: Consumer? = null @@ -57,9 +56,8 @@ open class KafkaBasicAuthMessageConsumerService( configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java - if (messageConsumerProperties.clientId != null) { - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! - } + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId + /** To handle Back pressure, Get only configured record for processing */ if (messageConsumerProperties.pollRecords > 0) { configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords @@ -79,7 +77,7 @@ open class KafkaBasicAuthMessageConsumerService( } - override suspend fun subscribe(consumerTopic: List, additionalConfig: Map?): Channel { + override suspend fun subscribe(topics: List, additionalConfig: Map?): Channel { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -89,15 +87,15 @@ open class KafkaBasicAuthMessageConsumerService( "topics(${messageConsumerProperties.bootstrapServers})" } - kafkaConsumer!!.subscribe(consumerTopic) - log.info("Successfully consumed topic($consumerTopic)") + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") - thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.info("Consumed Records : ${consumerRecords.count()}") + log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ @@ -119,6 +117,46 @@ open class KafkaBasicAuthMessageConsumerService( return channel } + override suspend fun consume(additionalConfig: Map?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume(topics: List, additionalConfig: Map?, + consumerFunction: ConsumerFunction) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + override suspend fun shutDown() { /** stop the polling loop */ keepGoing = false -- cgit 1.2.3-korg