From ada372a56d25b41e5a0926a18bf911d20102810f Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 11 Nov 2019 19:35:39 -0500 Subject: Add message prioritization module Kafka streams based solution for message prioritization using database store. Implement initial Abstract Processors, Puntuations and sample Topology for easy plug and play based on situations Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh Change-Id: I9c135604574cc3c642186545e076d6a7c60048d4 --- .../kafka/AbstractKafkaTopologyComponents.kt | 65 ++++++++++ .../message/kafka/KafkaJDBCStores.kt | 143 +++++++++++++++++++++ .../KafkaStreamsBasicAuthConsumerService.kt | 1 + 3 files changed, 209 insertions(+) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt new file mode 100644 index 000000000..4c6c0acdd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.Punctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** CDS Kafka Stream Processor abstract class to implement */ +abstract class AbstractBluePrintMessageProcessor : Processor { + + private val log = logger(AbstractBluePrintMessageProcessor::class) + + lateinit var processorContext: ProcessorContext + + + override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) { + try { + processNB(key, value) + } catch (e: Exception) { + log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e) + } + } + + override fun init(context: ProcessorContext) { + log.info("initializing processor (${this.javaClass.simpleName})") + this.processorContext = context + + } + + override fun close() { + log.info("closing processor (${this.javaClass.simpleName})") + } + + abstract suspend fun processNB(key: K, value: V) +} + +/** CDS Kafka Stream Punctuator abstract class to implement */ +abstract class AbstractBluePrintMessagePunctuator : Punctuator { + lateinit var processorContext: ProcessorContext + + + override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) { + punctuateNB(timestamp) + } + + abstract suspend fun punctuateNB(timestamp: Long) +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt new file mode 100644 index 000000000..86ccd74a2 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +/* +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.streams.state.StoreBuilder +import org.apache.kafka.streams.state.StoreSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import java.util.* + + +class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier { + + override fun get(): KafkaJDBCStore { + // Get the DBLibGenericService Instance + val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService() + return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService) + } + + override fun name(): String { + return name + } + + override fun metricsScope(): String { + return "jdbc-state" + } +} + +class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier) + : StoreBuilder { + + private var logConfig: MutableMap = HashMap() + private var enableCaching: Boolean = false + private var enableLogging = true + + override fun logConfig(): MutableMap { + return logConfig + } + + override fun withCachingDisabled(): StoreBuilder { + enableCaching = false + return this + } + + override fun loggingEnabled(): Boolean { + return enableLogging + } + + override fun withLoggingDisabled(): StoreBuilder { + enableLogging = false + return this + } + + override fun withCachingEnabled(): StoreBuilder { + enableCaching = true + return this + } + + override fun withLoggingEnabled(config: MutableMap?): StoreBuilder { + enableLogging = true + return this + } + + override fun name(): String { + return "KafkaJDBCKeyStoreBuilder" + } + + override fun build(): KafkaJDBCStore { + return storeSupplier.get() + } +} + +interface KafkaJDBCStore : StateStore { + + suspend fun query(sql: String, params: Map): List> + + suspend fun update(sql: String, params: Map): Int +} + + +class KafkaJDBCStoreImpl(private val name: String, + private val bluePrintDBLibGenericService: BluePrintDBLibGenericService) + : KafkaJDBCStore { + + private val log = logger(KafkaJDBCStoreImpl::class) + + override fun isOpen(): Boolean { + log.info("isOpen...") + return true + } + + override fun init(context: ProcessorContext, root: StateStore) { + log.info("init...") + } + + override fun flush() { + log.info("flush...") + } + + override fun close() { + log.info("Close...") + } + + override fun name(): String { + return name + } + + override fun persistent(): Boolean { + return true + } + + override suspend fun query(sql: String, params: Map): List> { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.query(sql, params) + } + + override suspend fun update(sql: String, params: Map): Int { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.update(sql, params) + } +} +*/ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt index 229e462da..d0297df4c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope val streamsConfig = streamsConfig(additionalConfig) val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") kafkaStreams = KafkaStreams(topology, streamsConfig) kafkaStreams.cleanUp() kafkaStreams.start() -- cgit 1.2.3-korg