diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
16 files changed, 185 insertions, 133 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt index a817c0c74..509689eca 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeader import java.nio.charset.Charset - fun <T : Headers> T?.toMap(): MutableMap<String, String> { val map: MutableMap<String, String> = hashMapOf() this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } @@ -29,4 +28,4 @@ fun <T : Headers> T?.toMap(): MutableMap<String, String> { fun Headers.addHeader(key: String, value: String) { this.add(RecordHeader(key, value.toByteArray())) -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt index ecffa280f..cc4c7fa4a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt @@ -17,7 +17,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message - import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService @@ -36,14 +35,13 @@ open class BluePrintMessageLibConfiguration * Exposed Dependency Service by this Message Lib Module */ fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService = - instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) + instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY) /** Extension functions for message producer service **/ fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService { return messageLibPropertyService().blueprintMessageProducerService(selector) } - fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService { return messageLibPropertyService().blueprintMessageProducerService(jsonNode) } @@ -65,4 +63,4 @@ class MessageLibConstants { const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth" const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth" } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index d0c3d5ae1..59e3606ea 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -22,7 +22,6 @@ import org.apache.kafka.streams.StreamsConfig /** Producer Properties **/ open class MessageProducerProperties - open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { lateinit var bootstrapServers: String var topic: String? = null diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt index 4c6c0acdd..72a70893a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt @@ -30,7 +30,6 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> { lateinit var processorContext: ProcessorContext - override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) { try { processNB(key, value) @@ -42,7 +41,6 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> { override fun init(context: ProcessorContext) { log.info("initializing processor (${this.javaClass.simpleName})") this.processorContext = context - } override fun close() { @@ -54,12 +52,12 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> { /** CDS Kafka Stream Punctuator abstract class to implement */ abstract class AbstractBluePrintMessagePunctuator : Punctuator { - lateinit var processorContext: ProcessorContext + lateinit var processorContext: ProcessorContext override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) { punctuateNB(timestamp) } abstract suspend fun punctuateNB(timestamp: Long) -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt index 853b88bc9..44b50af44 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt @@ -19,7 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService -import org.onap.ccsdk.cds.blueprintsprocessor.message.* +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @@ -62,8 +67,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer } } - private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties) - : BlueprintMessageProducerService { + private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties): + BlueprintMessageProducerService { when (MessageProducerProperties) { is KafkaBasicAuthMessageProducerProperties -> { @@ -77,7 +82,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties { return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageProducerProperties::class.java) + prefix, KafkaBasicAuthMessageProducerProperties::class.java + ) } /** Consumer Property Lib Service Implementation **/ @@ -126,8 +132,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer } } - private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties) - : BlueprintMessageConsumerService { + private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties): + BlueprintMessageConsumerService { when (messageConsumerProperties) { is KafkaBasicAuthMessageConsumerProperties -> { @@ -144,12 +150,13 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties { return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaBasicAuthMessageConsumerProperties::class.java) + prefix, KafkaBasicAuthMessageConsumerProperties::class.java + ) } private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties { return bluePrintPropertiesService.propertyBeanType( - prefix, KafkaStreamsBasicAuthConsumerProperties::class.java) + prefix, KafkaStreamsBasicAuthConsumerProperties::class.java + ) } - } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 716fda609..f74abcdb7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -50,21 +50,31 @@ interface BlueprintMessageConsumerService { } /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ - suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, - consumerFunction: ConsumerFunction) { + suspend fun consume( + topics: List<String>, + additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction + ) { throw BluePrintProcessorException("Not Implemented") } /** close the channel, consumer and other resources */ suspend fun shutDown() } + /** Consumer dynamic implementation interface */ interface KafkaConsumerRecordsFunction : ConsumerFunction { - suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, - consumerRecords: ConsumerRecords<*, *>) + + suspend fun invoke( + messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *> + ) } interface KafkaStreamConsumerFunction : ConsumerFunction { - suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>?): Topology -}
\ No newline at end of file + suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>? + ): Topology +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index 7d8138639..cdc65a1c6 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -47,4 +47,4 @@ interface BlueprintMessageProducerService { } suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index 757846c81..3415c8d0d 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -34,8 +34,9 @@ import java.time.Duration import kotlin.concurrent.thread open class KafkaBasicAuthMessageConsumerService( - private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) - : BlueprintMessageConsumerService { + private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties +) : + BlueprintMessageConsumerService { val log = logger(KafkaBasicAuthMessageConsumerService::class) val channel = Channel<String>() @@ -76,7 +77,6 @@ open class KafkaBasicAuthMessageConsumerService( return subscribe(consumerTopic, additionalConfig) } - override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -124,8 +124,11 @@ open class KafkaBasicAuthMessageConsumerService( return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) } - override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, - consumerFunction: ConsumerFunction) { + override suspend fun consume( + topics: List<String>, + additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction + ) { val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index ad9a594b0..8416282af 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -20,7 +20,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG +import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.serialization.ByteArraySerializer @@ -32,8 +37,9 @@ import org.slf4j.LoggerFactory import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( - private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) - : BlueprintMessageProducerService { + private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties +) : + BlueprintMessageProducerService { private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! @@ -51,8 +57,11 @@ class KafkaBasicAuthMessageProducerService( return sendMessageNB(messageProducerProperties.topic!!, message, headers) } - override suspend fun sendMessageNB(topic: String, message: Any, - headers: MutableMap<String, String>?): Boolean { + override suspend fun sendMessageNB( + topic: String, + message: Any, + headers: MutableMap<String, String>? + ): Boolean { val byteArrayMessage = when (message) { is String -> message.toByteArray(Charset.defaultCharset()) else -> message.asJsonString().toByteArray(Charset.defaultCharset()) @@ -95,4 +104,3 @@ class KafkaBasicAuthMessageProducerService( return kafkaProducer!! } } - diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt index d0297df4c..0b353d58b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt @@ -23,10 +23,10 @@ import org.apache.kafka.streams.StreamsConfig import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.util.* +import java.util.Properties -open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) - : BlueprintMessageConsumerService { +open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) : + BlueprintMessageConsumerService { val log = logger(KafkaStreamsBasicAuthConsumerService::class) lateinit var kafkaStreams: KafkaStreams @@ -68,4 +68,4 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope kafkaStreams.close() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt index 21bf1b76c..7ec70d91b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -31,15 +31,17 @@ import java.time.Instant import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter -import java.util.* +import java.util.UUID class MessageLoggerService { private val log = logger(MessageLoggerService::class) fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { - messageConsuming(headers.requestId, headers.subRequestId, - headers.originatorId, consumerRecord) + messageConsuming( + headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord + ) } fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { @@ -50,14 +52,19 @@ class MessageLoggerService { messageConsuming(requestID, invocationID, partnerName, consumerRecord) } - - fun messageConsuming(requestID: String, invocationID: String, partnerName: String, - consumerRecord: ConsumerRecord<*, *>) { + fun messageConsuming( + requestID: String, + invocationID: String, + partnerName: String, + consumerRecord: ConsumerRecord<*, *> + ) { val headers = consumerRecord.headers().toMap() val localhost = InetAddress.getLocalHost() - MDC.put("InvokeTimestamp", ZonedDateTime + MDC.put( + "InvokeTimestamp", ZonedDateTime .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) - .format(DateTimeFormatter.ISO_INSTANT)) + .format(DateTimeFormatter.ISO_INSTANT) + ) MDC.put("RequestID", requestID) MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) @@ -85,4 +92,4 @@ class MessageLoggerService { fun messageConsumingExisting() { MDC.clear() } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index b2accfb4d..823ba7dee 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -23,7 +23,11 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.* +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.MockConsumer +import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith @@ -40,26 +44,30 @@ import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull import kotlin.test.assertTrue - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", - "blueprintsprocessor.messageconsumer.sample.topic=default-topic", - "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", - "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", - "blueprintsprocessor.messageconsumer.sample.pollRecords=1", - - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.sample.groupId=sample-group", + "blueprintsprocessor.messageconsumer.sample.topic=default-topic", + "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", + "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample.pollRecords=1", + + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + ] +) open class BlueprintMessageConsumerServiceTest { + val log = logger(BlueprintMessageConsumerServiceTest::class) @Autowired @@ -69,7 +77,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerService() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -93,8 +101,10 @@ open class BlueprintMessageConsumerServiceTest { mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", - "I am message $i".toByteArray()) + val record = ConsumerRecord<String, ByteArray>( + topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray() + ) mockKafkaConsumer.addRecord(record) } @@ -114,7 +124,7 @@ open class BlueprintMessageConsumerServiceTest { fun testKafkaBasicAuthConsumerWithDynamicFunction() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) @@ -138,16 +148,21 @@ open class BlueprintMessageConsumerServiceTest { mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", - "I am message $i".toByteArray()) + val record = ConsumerRecord<String, ByteArray>( + topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray() + ) mockKafkaConsumer.addRecord(record) } every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer /** Test Consumer Function implementation */ val consumerFunction = object : KafkaConsumerRecordsFunction { - override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, - consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + override suspend fun invoke( + messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *> + ) { val count = consumerRecords.count() log.trace("Received Message count($count)") } @@ -159,11 +174,11 @@ open class BlueprintMessageConsumerServiceTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testKafkaIntegration() { runBlocking { val blueprintMessageConsumerService = bluePrintMessageLibPropertyService - .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") val channel = blueprintMessageConsumerService.subscribe(null) @@ -175,18 +190,20 @@ open class BlueprintMessageConsumerServiceTest { /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { delay(100) val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) } } delay(5000) blueprintMessageConsumerService.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 4fe5f5dd1..b824189d2 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -35,17 +35,20 @@ import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" -]) +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id" + ] +) open class BlueprintMessageProducerServiceTest { @Autowired @@ -55,7 +58,7 @@ open class BlueprintMessageProducerServiceTest { fun testKafkaBasicAuthProducertService() { runBlocking { val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() @@ -72,8 +75,4 @@ open class BlueprintMessageProducerServiceTest { assertTrue(response, "failed to get command response") } } - } - - - diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt index 9cd974622..1657d70b4 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt @@ -38,25 +38,29 @@ import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner import kotlin.test.assertNotNull - @RunWith(SpringRunner::class) @DirtiesContext -@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class, - BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]) -@TestPropertySource(properties = -[ - "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", - "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", - "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", +@ContextConfiguration( + classes = [BluePrintMessageLibConfiguration::class, + BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class] +) +@TestPropertySource( + properties = + [ + "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", + "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic", + "blueprintsprocessor.messageproducer.sample.clientId=default-client-id", - "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", - "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", - "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", - "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" + "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth", + "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092", + "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application", + "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic" -]) + ] +) class KafkaStreamsBasicAuthConsumerServiceTest { + @Autowired lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService @@ -67,15 +71,17 @@ class KafkaStreamsBasicAuthConsumerServiceTest { } /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ - //@Test + // @Test fun testKafkaStreamingMessageConsumer() { runBlocking { val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer") // Dynamic Consumer Function to create Topology val consumerFunction = object : KafkaStreamConsumerFunction { - override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties, - additionalConfig: Map<String, Any>?): Topology { + override suspend fun createTopology( + messageConsumerProperties: MessageConsumerProperties, + additionalConfig: Map<String, Any>? + ): Topology { val topology = Topology() val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties @@ -93,29 +99,34 @@ class KafkaStreamsBasicAuthConsumerServiceTest { // Store Buolder val countStoreSupplier = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("PriorityMessageState"), - Serdes.String(), - PriorityMessageSerde()) - .withLoggingEnabled(changelogConfig) + Stores.persistentKeyValueStore("PriorityMessageState"), + Serdes.String(), + PriorityMessageSerde() + ) + .withLoggingEnabled(changelogConfig) topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source") topology.addStateStore(countStoreSupplier, "FirstProcessor") - topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(), - PriorityMessageSerde().serializer(), "FirstProcessor") + topology.addSink( + "SINK", "default-stream-topic-out", Serdes.String().serializer(), + PriorityMessageSerde().serializer(), "FirstProcessor" + ) return topology } } /** Send message with every 1 sec */ val blueprintMessageProducerService = bluePrintMessageLibPropertyService - .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService + .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { delay(1000) val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() - blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", - headers = headers) + blueprintMessageProducerService.sendMessageNB( + message = "this is my message($it)", + headers = headers + ) } } streamingConsumerService.consume(null, consumerFunction) @@ -123,4 +134,4 @@ class KafkaStreamsBasicAuthConsumerServiceTest { streamingConsumerService.shutDown() } } -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt index 82e40efd1..3dce3344f 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -29,7 +29,6 @@ import kotlin.test.assertEquals class MessageLoggerServiceTest { - @Test fun testMessagingHeaders() { val messageLoggerService = MessageLoggerService() @@ -55,7 +54,5 @@ class MessageLoggerServiceTest { assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) messageLoggerService.messageConsumingExisting() - } - -}
\ No newline at end of file +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt index 4db9c772e..5d77c3746 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt @@ -28,7 +28,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import java.io.Serializable import java.nio.charset.Charset -import java.util.* +import java.util.UUID class PriorityMessage : Serializable { lateinit var id: String @@ -47,7 +47,7 @@ open class PriorityMessageSerde : Serde<PriorityMessage> { return object : Deserializer<PriorityMessage> { override fun deserialize(topic: String, data: ByteArray): PriorityMessage { return JacksonUtils.readValue(String(data), PriorityMessage::class.java) - ?: throw BluePrintProcessorException("failed to convert") + ?: throw BluePrintProcessorException("failed to convert") } override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { @@ -73,7 +73,6 @@ open class PriorityMessageSerde : Serde<PriorityMessage> { } } - class FirstProcessor : Processor<ByteArray, ByteArray> { private val log = logger(FirstProcessor::class) @@ -100,4 +99,4 @@ class FirstProcessor : Processor<ByteArray, ByteArray> { override fun close() { log.info("Close...") } -}
\ No newline at end of file +} |