diff options
author | Dan Timoney <dtimoney@att.com> | 2019-11-18 14:14:38 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2019-11-18 14:14:38 +0000 |
commit | 053895e37df903e11ef29b4e28572a31f896dba9 (patch) | |
tree | 51619901063f6ca988558bdeb5a95b87eb581245 /ms/blueprintsprocessor/modules/commons/message-lib/src | |
parent | f69938ae11e0db78d41b8ea3397a51938161f022 (diff) | |
parent | ada372a56d25b41e5a0926a18bf911d20102810f (diff) |
Merge "Add message prioritization module"
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
3 files changed, 209 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt new file mode 100644 index 000000000..4c6c0acdd --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -0,0 +1,65 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import org.apache.kafka.streams.processor.Processor +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.Punctuator +import org.onap.ccsdk.cds.controllerblueprints.core.logger + +/** CDS Kafka Stream Processor abstract class to implement */ +abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> { + + private val log = logger(AbstractBluePrintMessageProcessor::class) + + lateinit var processorContext: ProcessorContext + + + override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) { + try { + processNB(key, value) + } catch (e: Exception) { + log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e) + } + } + + override fun init(context: ProcessorContext) { + log.info("initializing processor (${this.javaClass.simpleName})") + this.processorContext = context + + } + + override fun close() { + log.info("closing processor (${this.javaClass.simpleName})") + } + + abstract suspend fun processNB(key: K, value: V) +} + +/** CDS Kafka Stream Punctuator abstract class to implement */ +abstract class AbstractBluePrintMessagePunctuator : Punctuator { + lateinit var processorContext: ProcessorContext + + + override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) { + punctuateNB(timestamp) + } + + abstract suspend fun punctuateNB(timestamp: Long) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt new file mode 100644 index 000000000..86ccd74a2 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt @@ -0,0 +1,143 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka + +/* +import org.apache.kafka.streams.processor.ProcessorContext +import org.apache.kafka.streams.processor.StateStore +import org.apache.kafka.streams.state.StoreBuilder +import org.apache.kafka.streams.state.StoreSupplier +import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService +import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService +import java.util.* + + +class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> { + + override fun get(): KafkaJDBCStore { + // Get the DBLibGenericService Instance + val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService() + return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService) + } + + override fun name(): String { + return name + } + + override fun metricsScope(): String { + return "jdbc-state" + } +} + +class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier) + : StoreBuilder<KafkaJDBCStore> { + + private var logConfig: MutableMap<String, String> = HashMap() + private var enableCaching: Boolean = false + private var enableLogging = true + + override fun logConfig(): MutableMap<String, String> { + return logConfig + } + + override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> { + enableCaching = false + return this + } + + override fun loggingEnabled(): Boolean { + return enableLogging + } + + override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> { + enableLogging = false + return this + } + + override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> { + enableCaching = true + return this + } + + override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> { + enableLogging = true + return this + } + + override fun name(): String { + return "KafkaJDBCKeyStoreBuilder" + } + + override fun build(): KafkaJDBCStore { + return storeSupplier.get() + } +} + +interface KafkaJDBCStore : StateStore { + + suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> + + suspend fun update(sql: String, params: Map<String, Any>): Int +} + + +class KafkaJDBCStoreImpl(private val name: String, + private val bluePrintDBLibGenericService: BluePrintDBLibGenericService) + : KafkaJDBCStore { + + private val log = logger(KafkaJDBCStoreImpl::class) + + override fun isOpen(): Boolean { + log.info("isOpen...") + return true + } + + override fun init(context: ProcessorContext, root: StateStore) { + log.info("init...") + } + + override fun flush() { + log.info("flush...") + } + + override fun close() { + log.info("Close...") + } + + override fun name(): String { + return name + } + + override fun persistent(): Boolean { + return true + } + + override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.query(sql, params) + } + + override suspend fun update(sql: String, params: Map<String, Any>): Int { + log.info("Query : $sql") + log.info("Params : $params") + return bluePrintDBLibGenericService.update(sql, params) + } +} +*/ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt index 229e462da..d0297df4c 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope val streamsConfig = streamsConfig(additionalConfig) val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig) + log.info("Kafka streams topology : ${topology.describe()}") kafkaStreams = KafkaStreams(topology, streamsConfig) kafkaStreams.cleanUp() kafkaStreams.start() |