aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt17
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt65
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt143
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt71
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt126
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt103
12 files changed, 564 insertions, 14 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 27a444bdc..ecffa280f 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -62,5 +63,6 @@ class MessageLibConstants {
const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+ const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
}
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 184e85b70..d0c3d5ae1 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -17,6 +17,8 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
+import org.apache.kafka.streams.StreamsConfig
+
/** Producer Properties **/
open class MessageProducerProperties
@@ -25,12 +27,27 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
lateinit var bootstrapServers: String
var topic: String? = null
var clientId: String? = null
+ // strongest producing guarantee
+ var acks: String = "all"
+ var retries: Int = 0
+ // ensure we don't push duplicates
+ var enableIdempotence: Boolean = true
}
/** Consumer Properties **/
open class MessageConsumerProperties
+open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
+ lateinit var bootstrapServers: String
+ lateinit var applicationId: String
+ lateinit var topic: String
+ var autoOffsetReset: String = "latest"
+ var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
+}
+
+open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties()
+
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4c6c0acdd
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.Punctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** CDS Kafka Stream Processor abstract class to implement */
+abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
+
+ private val log = logger(AbstractBluePrintMessageProcessor::class)
+
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) {
+ try {
+ processNB(key, value)
+ } catch (e: Exception) {
+ log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e)
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("initializing processor (${this.javaClass.simpleName})")
+ this.processorContext = context
+
+ }
+
+ override fun close() {
+ log.info("closing processor (${this.javaClass.simpleName})")
+ }
+
+ abstract suspend fun processNB(key: K, value: V)
+}
+
+/** CDS Kafka Stream Punctuator abstract class to implement */
+abstract class AbstractBluePrintMessagePunctuator : Punctuator {
+ lateinit var processorContext: ProcessorContext
+
+
+ override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) {
+ punctuateNB(timestamp)
+ }
+
+ abstract suspend fun punctuateNB(timestamp: Long)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
new file mode 100644
index 000000000..86ccd74a2
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+/*
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.state.StoreBuilder
+import org.apache.kafka.streams.state.StoreSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import java.util.*
+
+
+class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> {
+
+ override fun get(): KafkaJDBCStore {
+ // Get the DBLibGenericService Instance
+ val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService()
+ return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService)
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun metricsScope(): String {
+ return "jdbc-state"
+ }
+}
+
+class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier)
+ : StoreBuilder<KafkaJDBCStore> {
+
+ private var logConfig: MutableMap<String, String> = HashMap()
+ private var enableCaching: Boolean = false
+ private var enableLogging = true
+
+ override fun logConfig(): MutableMap<String, String> {
+ return logConfig
+ }
+
+ override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = false
+ return this
+ }
+
+ override fun loggingEnabled(): Boolean {
+ return enableLogging
+ }
+
+ override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = false
+ return this
+ }
+
+ override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> {
+ enableCaching = true
+ return this
+ }
+
+ override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> {
+ enableLogging = true
+ return this
+ }
+
+ override fun name(): String {
+ return "KafkaJDBCKeyStoreBuilder"
+ }
+
+ override fun build(): KafkaJDBCStore {
+ return storeSupplier.get()
+ }
+}
+
+interface KafkaJDBCStore : StateStore {
+
+ suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>>
+
+ suspend fun update(sql: String, params: Map<String, Any>): Int
+}
+
+
+class KafkaJDBCStoreImpl(private val name: String,
+ private val bluePrintDBLibGenericService: BluePrintDBLibGenericService)
+ : KafkaJDBCStore {
+
+ private val log = logger(KafkaJDBCStoreImpl::class)
+
+ override fun isOpen(): Boolean {
+ log.info("isOpen...")
+ return true
+ }
+
+ override fun init(context: ProcessorContext, root: StateStore) {
+ log.info("init...")
+ }
+
+ override fun flush() {
+ log.info("flush...")
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+
+ override fun name(): String {
+ return name
+ }
+
+ override fun persistent(): Boolean {
+ return true
+ }
+
+ override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.query(sql, params)
+ }
+
+ override suspend fun update(sql: String, params: Map<String, Any>): Int {
+ log.info("Query : $sql")
+ log.info("Params : $params")
+ return bluePrintDBLibGenericService.update(sql, params)
+ }
+}
+*/
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 7c56ea432..97da7285d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,14 +18,14 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.*
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
-open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
+open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesService: BluePrintPropertiesService) {
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
@@ -38,7 +39,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
- val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+ val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
kafkaBasicAuthMessageProducerProperties(prefix)
@@ -75,7 +76,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
- return bluePrintProperties.propertyBeanType(
+ return bluePrintPropertiesService.propertyBeanType(
prefix, KafkaBasicAuthMessageProducerProperties::class.java)
}
@@ -96,11 +97,14 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
/** Return Message Consumer Properties for [prefix] definitions. */
fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
- val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+ val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
kafkaBasicAuthMessageConsumerProperties(prefix)
}
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -113,6 +117,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -126,6 +133,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
is KafkaBasicAuthMessageConsumerProperties -> {
return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
}
+ is KafkaStreamsBasicAuthConsumerProperties -> {
+ return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+ }
else -> {
throw BluePrintProcessorException("couldn't get Message client service for")
}
@@ -133,8 +143,13 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
- return bluePrintProperties.propertyBeanType(
+ return bluePrintPropertiesService.propertyBeanType(
prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
}
+ private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
+ return bluePrintProperties.propertyBeanType(
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)
+ }
+
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 8bcc7580a..716fda609 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -20,6 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
@@ -61,4 +62,9 @@ interface BlueprintMessageConsumerService {
interface KafkaConsumerRecordsFunction : ConsumerFunction {
suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
consumerRecords: ConsumerRecords<*, *>)
+}
+
+interface KafkaStreamConsumerFunction : ConsumerFunction {
+ suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?): Topology
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 42adcd712..ad9a594b0 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -65,9 +65,9 @@ class KafkaBasicAuthMessageProducerService(
headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
- log.info("message published offset(${metadata.offset()}, headers :$headers )")
+ log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers")
}
- messageTemplate().send(record, callback).get()
+ messageTemplate().send(record, callback)
return true
}
@@ -77,6 +77,8 @@ class KafkaBasicAuthMessageProducerService(
configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
+ configProps[ACKS_CONFIG] = messageProducerProperties.acks
+ configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence
if (messageProducerProperties.clientId != null) {
configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
new file mode 100644
index 000000000..d0297df4c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.streams.KafkaStreams
+import org.apache.kafka.streams.StreamsConfig
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.util.*
+
+open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties)
+ : BlueprintMessageConsumerService {
+
+ val log = logger(KafkaStreamsBasicAuthConsumerService::class)
+ lateinit var kafkaStreams: KafkaStreams
+
+ private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties {
+ val configProperties = Properties()
+ configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId
+ configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
+ configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee
+ // TODO("Security Implementation based on type")
+ /** add or override already set properties */
+ additionalConfig?.let { configProperties.putAll(it) }
+ /** Create Kafka consumer */
+ return configProperties
+ }
+
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ throw BluePrintProcessorException("not implemented")
+ }
+
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ throw BluePrintProcessorException("not implemented")
+ }
+
+ override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ val streamsConfig = streamsConfig(additionalConfig)
+ val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction
+ val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig)
+ log.info("Kafka streams topology : ${topology.describe()}")
+ kafkaStreams = KafkaStreams(topology, streamsConfig)
+ kafkaStreams.cleanUp()
+ kafkaStreams.start()
+ kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") }
+ }
+
+ override suspend fun shutDown() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index bdceec7d3..b2accfb4d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -27,8 +27,8 @@ import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -44,7 +44,7 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index f23624f7a..4fe5f5dd1 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -23,8 +23,8 @@ import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.RecordMetadata
import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
@@ -39,7 +39,7 @@ import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DirtiesContext
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
@TestPropertySource(properties =
["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
new file mode 100644
index 000000000..e2a31f40a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
@@ -0,0 +1,126 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.apache.kafka.streams.state.Stores
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+[
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+
+])
+class KafkaStreamsBasicAuthConsumerServiceTest {
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Test
+ fun testProperties() {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService")
+ }
+
+ /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+ //@Test
+ fun testKafkaStreamingMessageConsumer() {
+ runBlocking {
+ val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+
+ // Dynamic Consumer Function to create Topology
+ val consumerFunction = object : KafkaStreamConsumerFunction {
+ override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?): Topology {
+ val topology = Topology()
+ val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
+
+ val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+ topology.addSource("Source", *topics.toTypedArray())
+ // Processor Supplier
+ val firstProcessorSupplier = object : ProcessorSupplier<ByteArray, ByteArray> {
+ override fun get(): Processor<ByteArray, ByteArray> {
+ return FirstProcessor()
+ }
+ }
+ val changelogConfig: MutableMap<String, String> = hashMapOf()
+ changelogConfig.put("min.insync.replicas", "1")
+
+ // Store Buolder
+ val countStoreSupplier = Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore("PriorityMessageState"),
+ Serdes.String(),
+ PriorityMessageSerde())
+ .withLoggingEnabled(changelogConfig)
+
+ topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
+ topology.addStateStore(countStoreSupplier, "FirstProcessor")
+ topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
+ PriorityMessageSerde().serializer(), "FirstProcessor")
+ return topology
+ }
+ }
+
+ /** Send message with every 1 sec */
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ launch {
+ repeat(5) {
+ delay(1000)
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
+ }
+ }
+ streamingConsumerService.consume(null, consumerFunction)
+ delay(10000)
+ streamingConsumerService.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
new file mode 100644
index 000000000..4db9c772e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.state.KeyValueStore
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.io.Serializable
+import java.nio.charset.Charset
+import java.util.*
+
+class PriorityMessage : Serializable {
+ lateinit var id: String
+ lateinit var requestMessage: String
+}
+
+open class PriorityMessageSerde : Serde<PriorityMessage> {
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+
+ override fun deserializer(): Deserializer<PriorityMessage> {
+ return object : Deserializer<PriorityMessage> {
+ override fun deserialize(topic: String, data: ByteArray): PriorityMessage {
+ return JacksonUtils.readValue(String(data), PriorityMessage::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ }
+
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun close() {
+ }
+ }
+ }
+
+ override fun serializer(): Serializer<PriorityMessage> {
+ return object : Serializer<PriorityMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, data: PriorityMessage): ByteArray {
+ return data.asJsonString().toByteArray(Charset.defaultCharset())
+ }
+
+ override fun close() {
+ }
+ }
+ }
+}
+
+
+class FirstProcessor : Processor<ByteArray, ByteArray> {
+
+ private val log = logger(FirstProcessor::class)
+
+ private lateinit var context: ProcessorContext
+ private lateinit var kvStore: KeyValueStore<String, PriorityMessage>
+
+ override fun process(key: ByteArray, value: ByteArray) {
+ log.info("First Processor key(${String(key)} : value(${String(value)})")
+ val newMessage = PriorityMessage().apply {
+ id = UUID.randomUUID().toString()
+ requestMessage = String(value)
+ }
+ kvStore.put(newMessage.id, newMessage)
+ this.context.forward(newMessage.id, newMessage)
+ }
+
+ override fun init(context: ProcessorContext) {
+ log.info("init... ${context.keySerde()}, ${context.valueSerde()}")
+ this.context = context
+ this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore<String, PriorityMessage>
+ }
+
+ override fun close() {
+ log.info("Close...")
+ }
+} \ No newline at end of file