summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-11-18 14:14:38 +0000
committerGerrit Code Review <gerrit@onap.org>2019-11-18 14:14:38 +0000
commit053895e37df903e11ef29b4e28572a31f896dba9 (patch)
tree51619901063f6ca988558bdeb5a95b87eb581245 /ms/blueprintsprocessor/modules/commons/message-lib/src
parentf69938ae11e0db78d41b8ea3397a51938161f022 (diff)
parentada372a56d25b41e5a0926a18bf911d20102810f (diff)
Merge "Add message prioritization module"
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt65
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt143
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt1
3 files changed, 209 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4c6c0acdd
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.Punctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** CDS Kafka Stream Processor abstract class to implement */
+abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
+
+ private val log = logger(AbstractBluePrintMessageProcessor::class)
+
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) {
+ try {
+ processNB(key, value)
+ } catch (e: Exception) {
+ log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e)
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("initializing processor (${this.javaClass.simpleName})")
+ this.processorContext = context
+
+ }
+
+ override fun close() {
+ log.info("closing processor (${this.javaClass.simpleName})")
+ }
+
+ abstract suspend fun processNB(key: K, value: V)
+}
+
+/** CDS Kafka Stream Punctuator abstract class to implement */
+abstract class AbstractBluePrintMessagePunctuator : Punctuator {
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) {
+ punctuateNB(timestamp)
+ }
+
+ abstract suspend fun punctuateNB(timestamp: Long)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
new file mode 100644
index 000000000..86ccd74a2
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+/*
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.state.StoreBuilder
+import org.apache.kafka.streams.state.StoreSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import java.util.*
+
+
+class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> {
+
+ override fun get(): KafkaJDBCStore {
+ // Get the DBLibGenericService Instance
+ val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService()
+ return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService)
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun metricsScope(): String {
+ return "jdbc-state"
+ }
+}
+
+class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier)
+ : StoreBuilder<KafkaJDBCStore> {
+
+ private var logConfig: MutableMap<String, String> = HashMap()
+ private var enableCaching: Boolean = false
+ private var enableLogging = true
+
+ override fun logConfig(): MutableMap<String, String> {
+ return logConfig
+ }
+
+ override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = false
+ return this
+ }
+
+ override fun loggingEnabled(): Boolean {
+ return enableLogging
+ }
+
+ override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = false
+ return this
+ }
+
+ override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = true
+ return this
+ }
+
+ override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = true
+ return this
+ }
+
+ override fun name(): String {
+ return "KafkaJDBCKeyStoreBuilder"
+ }
+
+ override fun build(): KafkaJDBCStore {
+ return storeSupplier.get()
+ }
+}
+
+interface KafkaJDBCStore : StateStore {
+
+ suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>>
+
+ suspend fun update(sql: String, params: Map<String, Any>): Int
+}
+
+
+class KafkaJDBCStoreImpl(private val name: String,
+ private val bluePrintDBLibGenericService: BluePrintDBLibGenericService)
+ : KafkaJDBCStore {
+
+ private val log = logger(KafkaJDBCStoreImpl::class)
+
+ override fun isOpen(): Boolean {
+ log.info("isOpen...")
+ return true
+ }
+
+ override fun init(context: ProcessorContext, root: StateStore) {
+ log.info("init...")
+ }
+
+ override fun flush() {
+ log.info("flush...")
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun persistent(): Boolean {
+ return true
+ }
+
+ override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.query(sql, params)
+ }
+
+ override suspend fun update(sql: String, params: Map<String, Any>): Int {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.update(sql, params)
+ }
+}
+*/
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
index 229e462da..d0297df4c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
@@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope
val streamsConfig = streamsConfig(additionalConfig)
val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction
val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig)
+ log.info("Kafka streams topology : ${topology.describe()}")
kafkaStreams = KafkaStreams(topology, streamsConfig)
kafkaStreams.cleanUp()
kafkaStreams.start()