summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-09-05 18:54:33 +0000
committerGerrit Code Review <gerrit@onap.org>2019-09-05 18:54:33 +0000
commitc656c611de770cb531199bf47c35cb41389fc100 (patch)
tree989692ae3fbb8d3e27d4032c0807f1eb27f9a904
parent25d4619d067d788c9c72bd96b52db92b8d7de6b4 (diff)
parent8d94f0bd0a9e4d1c57e98df726841dd0e2978569 (diff)
Merge "Add Kafka message lib consumer services"
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt34
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt17
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt86
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt113
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt100
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt19
9 files changed, 384 insertions, 39 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index 644c51860..281a970b8 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -17,6 +17,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
@@ -26,10 +31,37 @@ import org.springframework.context.annotation.Configuration
@EnableConfigurationProperties
open class BluePrintMessageLibConfiguration
+/**
+ * Exposed Dependency Service by this Message Lib Module
+ */
+fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService =
+ instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
+
+/** Extension functions for message producer service **/
+fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService {
+ return messageLibPropertyService().blueprintMessageProducerService(selector)
+}
+
+
+fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
+ return messageLibPropertyService().blueprintMessageProducerService(jsonNode)
+}
+
+/** Extension functions for message consumer service **/
+fun BluePrintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService {
+ return messageLibPropertyService().blueprintMessageConsumerService(selector)
+}
+
+fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
+ return messageLibPropertyService().blueprintMessageConsumerService(jsonNode)
+}
+
class MessageLibConstants {
companion object {
const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
- const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient."
+ //TODO("Change to .messageconsumer in application.properties")
+ const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+ const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
}
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index e621ec66f..c77cdfdb5 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -16,7 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
-
+/** Producer Properties **/
open class MessageProducerProperties
@@ -24,4 +24,17 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
lateinit var bootstrapServers: String
var topic: String? = null
var clientId: String? = null
-} \ No newline at end of file
+}
+
+/** Consumer Properties **/
+
+open class MessageConsumerProperties
+
+open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
+ lateinit var bootstrapServers: String
+ lateinit var groupId: String
+ var consumerTopic: String? = null
+ var pollMillSec: Long = 100
+}
+
+open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index fb01ce179..7c56ea432 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -18,9 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.*
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@@ -28,22 +26,22 @@ import org.springframework.stereotype.Service
@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
- fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService {
- val messageClientProperties = messageClientProperties(jsonNode)
- return blueprintMessageClientService(messageClientProperties)
+ fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
+ val messageClientProperties = messageProducerProperties(jsonNode)
+ return blueprintMessageProducerService(messageClientProperties)
}
- fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService {
- val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector"
- val messageClientProperties = messageClientProperties(prefix)
- return blueprintMessageClientService(messageClientProperties)
+ fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
+ val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
+ val messageClientProperties = messageProducerProperties(prefix)
+ return blueprintMessageProducerService(messageClientProperties)
}
- fun messageClientProperties(prefix: String): MessageProducerProperties {
+ fun messageProducerProperties(prefix: String): MessageProducerProperties {
val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageClientProperties(prefix)
+ kafkaBasicAuthMessageProducerProperties(prefix)
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -51,7 +49,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
}
- fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties {
+ fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
val type = jsonNode.get("type").textValue()
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
@@ -63,7 +61,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
}
- private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties)
+ private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
: BlueprintMessageProducerService {
when (MessageProducerProperties) {
@@ -76,9 +74,67 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
}
}
- private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
+ private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
return bluePrintProperties.propertyBeanType(
prefix, KafkaBasicAuthMessageProducerProperties::class.java)
}
+ /** Consumer Property Lib Service Implementation **/
+
+ /** Return Message Consumer Service for [jsonNode] definitions. */
+ fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
+ val messageConsumerProperties = messageConsumerProperties(jsonNode)
+ return blueprintMessageConsumerService(messageConsumerProperties)
+ }
+
+ /** Return Message Consumer Service for [selector] definitions. */
+ fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
+ val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
+ val messageClientProperties = messageConsumerProperties(prefix)
+ return blueprintMessageConsumerService(messageClientProperties)
+ }
+
+ /** Return Message Consumer Properties for [prefix] definitions. */
+ fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
+ val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+ return when (type) {
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ kafkaBasicAuthMessageConsumerProperties(prefix)
+ }
+ else -> {
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ }
+ }
+ }
+
+ fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
+ val type = jsonNode.get("type").textValue()
+ return when (type) {
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
+ }
+ else -> {
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ }
+ }
+ }
+
+ private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
+ : BlueprintMessageConsumerService {
+
+ when (messageConsumerProperties) {
+ is KafkaBasicAuthMessageConsumerProperties -> {
+ return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+ }
+ else -> {
+ throw BluePrintProcessorException("couldn't get Message client service for")
+ }
+ }
+ }
+
+ private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
+ return bluePrintProperties.propertyBeanType(
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
+ }
+
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
new file mode 100644
index 000000000..25f0bf44d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+
+interface BlueprintMessageConsumerService {
+
+ /** Subscribe to the Kafka channel with [additionalConfig] */
+ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+
+ /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
+ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+
+ /** close the channel, consumer and other resources */
+ suspend fun shutDown()
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
new file mode 100644
index 000000000..076501eab
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -0,0 +1,113 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.time.Duration
+import kotlin.concurrent.thread
+
+class KafkaBasicAuthMessageConsumerService(
+ private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
+ : BlueprintMessageConsumerService {
+
+ private val channel = Channel<String>()
+ private var kafkaConsumer: Consumer<String, String>? = null
+ val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
+ @Volatile
+ var keepGoing = true
+
+ fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+ val configProperties = hashMapOf<String, Any>()
+ configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ /** add or override already set properties */
+ additionalConfig?.let { configProperties.putAll(it) }
+ /** Create Kafka consumer */
+ return KafkaConsumer(configProperties)
+ }
+
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ /** get to topic names */
+ val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+ check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
+ return subscribe(consumerTopic, additionalConfig)
+ }
+
+
+ override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ /** Create Kafka consumer */
+ kafkaConsumer = kafkaConsumer(additionalConfig)
+ checkNotNull(kafkaConsumer) {
+ "failed to create kafka consumer for " +
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
+ }
+
+ kafkaConsumer!!.subscribe(consumerTopic)
+ log.info("Successfully consumed topic($consumerTopic)")
+
+ val listenerThread = thread(start = true, name = "KafkaConsumer") {
+ keepGoing = true
+ kafkaConsumer!!.use { kc ->
+ while (keepGoing) {
+ val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ runBlocking {
+ consumerRecords?.forEach { consumerRecord ->
+ /** execute the command block */
+ consumerRecord.value()?.let {
+ launch {
+ if (!channel.isClosedForSend) {
+ channel.send(it)
+ } else {
+ log.error("Channel is closed to receive message")
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
+ log.info("Successfully consumed in thread(${listenerThread})")
+ return channel
+ }
+
+ override suspend fun shutDown() {
+ /** Close the Channel */
+ channel.close()
+ /** stop the polling loop */
+ keepGoing = false
+ if (kafkaConsumer != null) {
+ /** sunsubscribe the consumer */
+ kafkaConsumer!!.unsubscribe()
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
new file mode 100644
index 000000000..18b86b8d8
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.spyk
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.MockConsumer
+import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.common.TopicPartition
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+ BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
+ "blueprintsprocessor.messageclient.sample.groupId=sample-group",
+ "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
+])
+open class BlueprintMessageConsumerServiceTest {
+
+ @Autowired
+ lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+ @Test
+ fun testKafkaBasicAuthConsumerService() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+ val topic = "default-topic"
+ val partitions: MutableList<TopicPartition> = arrayListOf()
+ val topicsCollection: MutableList<String> = arrayListOf()
+ partitions.add(TopicPartition(topic, 1))
+ val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+ val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+ val records: Long = 10
+ partitions.forEach { partition ->
+ partitionsBeginningMap[partition] = 0L
+ partitionsEndMap[partition] = records
+ topicsCollection.add(partition.topic())
+ }
+ val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+ mockKafkaConsumer.subscribe(topicsCollection)
+ mockKafkaConsumer.rebalance(partitions)
+ mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+ mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+ for (i in 1..10) {
+ val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i")
+ mockKafkaConsumer.addRecord(record)
+ }
+
+ every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+ val channel = spyBlueprintMessageConsumerService.subscribe(null)
+ launch {
+ channel.consumeEach {
+ println("Received message : $it")
+ }
+ }
+ //delay(100)
+ spyBlueprintMessageConsumerService.shutDown()
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 0f8367d7e..0db62c1df 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -41,10 +41,10 @@ import kotlin.test.assertTrue
@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
BlueprintPropertyConfiguration::class, BluePrintProperties::class])
@TestPropertySource(properties =
-["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
- "blueprintsprocessor.messageclient.sample.topic=default-topic",
- "blueprintsprocessor.messageclient.sample.clientId=default-client-id"
+["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
])
open class BlueprintMessageProducerServiceTest {
@@ -52,10 +52,10 @@ open class BlueprintMessageProducerServiceTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Test
- fun testKafkaBasicAuthClientService() {
+ fun testKafkaBasicAuthProducertService() {
runBlocking {
- val bluePrintMessageClientService = bluePrintMessageLibPropertyService
- .blueprintMessageClientService("sample") as KafkaBasicAuthMessageProducerService
+ val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
@@ -64,11 +64,11 @@ open class BlueprintMessageProducerServiceTest {
every { mockKafkaTemplate.send(any(), any()) } returns future
- val spyBluePrintMessageClientService = spyk(bluePrintMessageClientService, recordPrivateCalls = true)
+ val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
- every { spyBluePrintMessageClientService.messageTemplate(any()) } returns mockKafkaTemplate
+ every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
- val response = spyBluePrintMessageClientService.sendMessage("Testing message")
+ val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
assertTrue(response, "failed to get command response")
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
index 626b8f911..3868440c7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
</encoder>
</appender>
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
index 54cc0c129..e848ed384 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
@@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Service
+//TODO("Implement with property service and remove spring bindings")
@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
@Service
open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
@@ -56,18 +57,16 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- if (executionServiceOutput.status.code == EXECUTION_STATUS) {
- val bluePrintMessageClientService = propertyService
- .blueprintMessageClientService(PREFIX)
+ if (executionServiceOutput.status.code == EXECUTION_STATUS) {
+ val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
- val payload = executionServiceOutput.payload
+ val payload = executionServiceOutput.payload
- log.info("The payload to publish is {}", payload)
+ log.info("The payload to publish is {}", payload)
- bluePrintMessageClientService.sendMessage(payload)
- }
- else {
- log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
- }
+ blueprintMessageProducerService.sendMessage(payload)
+ } else {
+ log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
+ }
}
}