summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-11-11 19:35:39 -0500
committerBrinda Santh <bs2796@att.com>2019-11-13 15:18:32 -0500
commitada372a56d25b41e5a0926a18bf911d20102810f (patch)
treedac9be4e51fc2e74aefde4e1cf4222267539eee8 /ms/blueprintsprocessor/modules/commons
parenta0407eb91a2424f847e188796328871b3a339c93 (diff)
Add message prioritization module
Kafka streams based solution for message prioritization using database store. Implement initial Abstract Processors, Puntuations and sample Topology for easy plug and play based on situations Issue-ID: CCSDK-1917 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I9c135604574cc3c642186545e076d6a7c60048d4
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt65
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt143
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt1
3 files changed, 209 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4c6c0acdd
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.Punctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** CDS Kafka Stream Processor abstract class to implement */
+abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
+
+ private val log = logger(AbstractBluePrintMessageProcessor::class)
+
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) {
+ try {
+ processNB(key, value)
+ } catch (e: Exception) {
+ log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e)
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("initializing processor (${this.javaClass.simpleName})")
+ this.processorContext = context
+
+ }
+
+ override fun close() {
+ log.info("closing processor (${this.javaClass.simpleName})")
+ }
+
+ abstract suspend fun processNB(key: K, value: V)
+}
+
+/** CDS Kafka Stream Punctuator abstract class to implement */
+abstract class AbstractBluePrintMessagePunctuator : Punctuator {
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) {
+ punctuateNB(timestamp)
+ }
+
+ abstract suspend fun punctuateNB(timestamp: Long)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
new file mode 100644
index 000000000..86ccd74a2
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+/*
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.state.StoreBuilder
+import org.apache.kafka.streams.state.StoreSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import java.util.*
+
+
+class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> {
+
+ override fun get(): KafkaJDBCStore {
+ // Get the DBLibGenericService Instance
+ val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService()
+ return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService)
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun metricsScope(): String {
+ return "jdbc-state"
+ }
+}
+
+class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier)
+ : StoreBuilder<KafkaJDBCStore> {
+
+ private var logConfig: MutableMap<String, String> = HashMap()
+ private var enableCaching: Boolean = false
+ private var enableLogging = true
+
+ override fun logConfig(): MutableMap<String, String> {
+ return logConfig
+ }
+
+ override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = false
+ return this
+ }
+
+ override fun loggingEnabled(): Boolean {
+ return enableLogging
+ }
+
+ override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = false
+ return this
+ }
+
+ override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = true
+ return this
+ }
+
+ override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = true
+ return this
+ }
+
+ override fun name(): String {
+ return "KafkaJDBCKeyStoreBuilder"
+ }
+
+ override fun build(): KafkaJDBCStore {
+ return storeSupplier.get()
+ }
+}
+
+interface KafkaJDBCStore : StateStore {
+
+ suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>>
+
+ suspend fun update(sql: String, params: Map<String, Any>): Int
+}
+
+
+class KafkaJDBCStoreImpl(private val name: String,
+ private val bluePrintDBLibGenericService: BluePrintDBLibGenericService)
+ : KafkaJDBCStore {
+
+ private val log = logger(KafkaJDBCStoreImpl::class)
+
+ override fun isOpen(): Boolean {
+ log.info("isOpen...")
+ return true
+ }
+
+ override fun init(context: ProcessorContext, root: StateStore) {
+ log.info("init...")
+ }
+
+ override fun flush() {
+ log.info("flush...")
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun persistent(): Boolean {
+ return true
+ }
+
+ override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.query(sql, params)
+ }
+
+ override suspend fun update(sql: String, params: Map<String, Any>): Int {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.update(sql, params)
+ }
+}
+*/
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
index 229e462da..d0297df4c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
@@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope
val streamsConfig = streamsConfig(additionalConfig)
val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig)
+ log.info("Kafka streams topology : ${topology.describe()}")
kafkaStreams = KafkaStreams(topology, streamsConfig)
kafkaStreams.cleanUp()
kafkaStreams.start()