diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
11 files changed, 415 insertions, 73 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..a817c0c74 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + + +fun <T : Headers> T?.toMap(): MutableMap<String, String> { + val map: MutableMap<String, String> = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 1cd8a2af7..184e85b70 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +34,10 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var clientId: String? = null + lateinit var clientId: String var topic: String? = null + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 25f0bf44d..8bcc7580a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,47 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +/** Consumer Function Interfaces */ +interface ConsumerFunction interface BlueprintMessageConsumerService { + suspend fun subscribe(): Channel<String> { + return subscribe(null) + } + /** Subscribe to the Kafka channel with [additionalConfig] */ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + /** Consume and execute dynamic function [consumerFunction] */ + suspend fun consume(consumerFunction: ConsumerFunction) { + consume(null, consumerFunction) + } + + /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + + /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + /** close the channel, consumer and other resources */ suspend fun shutDown() - +} +/** Consumer dynamic implementation interface */ +interface KafkaConsumerRecordsFunction : ConsumerFunction { + suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *>) }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index e33d41c09..7d8138639 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean = runBlocking { - sendMessageNB(message) + fun sendMessage(message: Any): Boolean { + return sendMessage(message = message, headers = null) } - fun sendMessage(topic: String, message: Any): Boolean = runBlocking { - sendMessageNB(topic, message) + fun sendMessage(topic: String, message: Any): Boolean { + return sendMessage(topic, message, null) } - suspend fun sendMessageNB(message: Any): Boolean + fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(message = message, headers = headers) + } + + fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(topic, message, headers) + } + + suspend fun sendMessageNB(message: Any): Boolean { + return sendMessageNB(message = message, headers = null) + } + + suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean { + return sendMessageNB(topic, message, null) + } - suspend fun sendMessageNB(topic: String, message: Any): Boolean + suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b5d444a49..757846c81 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,33 +25,39 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -class KafkaBasicAuthMessageConsumerService( +open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { - private val channel = Channel<String>() - private var kafkaConsumer: Consumer<String, String>? = null val log = logger(KafkaBasicAuthMessageConsumerService::class) + val channel = Channel<String>() + var kafkaConsumer: Consumer<String, ByteArray>? = null @Volatile var keepGoing = true - fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - if (messageConsumerProperties.clientId != null) { - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! - } + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId + /** To handle Back pressure, Get only configured record for processing */ if (messageConsumerProperties.pollRecords > 0) { configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords @@ -70,7 +77,7 @@ class KafkaBasicAuthMessageConsumerService( } - override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -80,22 +87,22 @@ class KafkaBasicAuthMessageConsumerService( "topics(${messageConsumerProperties.bootstrapServers})" } - kafkaConsumer!!.subscribe(consumerTopic) - log.info("Successfully consumed topic($consumerTopic)") + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") - thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.info("Consumed Records : ${consumerRecords.count()}") + log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ consumerRecord.value()?.let { launch { if (!channel.isClosedForSend) { - channel.send(it) + channel.send(String(it, Charset.defaultCharset())) } else { log.error("Channel is closed to receive message") } @@ -110,6 +117,46 @@ class KafkaBasicAuthMessageConsumerService( return channel } + override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + override suspend fun shutDown() { /** stop the polling loop */ keepGoing = false diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 1c93bb0fc..42adcd712 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.SendResult -import org.springframework.util.concurrent.ListenableFutureCallback +import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) @@ -34,42 +37,46 @@ class KafkaBasicAuthMessageProducerService( private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - private var kafkaTemplate: KafkaTemplate<String, Any>? = null + private var kafkaProducer: KafkaProducer<String, ByteArray>? = null + + private val messageLoggerService = MessageLoggerService() override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessage(messageProducerProperties.topic!!, message) + return sendMessageNB(messageProducerProperties.topic!!, message) } - override suspend fun sendMessageNB(topic: String, message: Any): Boolean { - val serializedMessage = when (message) { - is String -> { - message - } - else -> { - message.asJsonType().toString() - } - } - val future = messageTemplate().send(topic, serializedMessage) + override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } - future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> { - override fun onSuccess(result: SendResult<String, Any>) { - log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") - } + override suspend fun sendMessageNB(topic: String, message: Any, + headers: MutableMap<String, String>?): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } - override fun onFailure(ex: Throwable) { - log.error("Unable to send message", ex) - } - }) + val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) + headers?.let { + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.info("message published offset(${metadata.offset()}, headers :$headers )") + } + messageTemplate().send(record, callback).get() return true } - private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> { - log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { + log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = hashMapOf<String, Any>() configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } @@ -79,14 +86,11 @@ class KafkaBasicAuthMessageProducerService( if (additionalConfig != null) { configProps.putAll(additionalConfig) } - return DefaultKafkaProducerFactory(configProps) - } - fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - if (kafkaTemplate == null) { - kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + if (kafkaProducer == null) { + kafkaProducer = KafkaProducer(configProps) } - return kafkaTemplate!! + return kafkaProducer!! } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt new file mode 100644 index 000000000..21bf1b76c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class MessageLoggerService { + + private val log = logger(MessageLoggerService::class) + + fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { + messageConsuming(headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord) + } + + fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + messageConsuming(requestID, invocationID, partnerName, consumerRecord) + } + + + fun messageConsuming(requestID: String, invocationID: String, partnerName: String, + consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val localhost = InetAddress.getLocalHost() + MDC.put("InvokeTimestamp", ZonedDateTime + .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", consumerRecord.topic()) + // Custom MDC for Message Consumers + MDC.put("Offset", consumerRecord.offset().toString()) + MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty()) + log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + /** Used before producing message request, Inbound Invocation ID is used as request Id + * for produced message Request, If invocation Id is missing then default Request Id will be generated. + */ + fun messageProducing(requestHeader: Headers) { + val localhost = InetAddress.getLocalHost() + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName) + requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) + } + + fun messageConsumingExisting() { + MDC.clear() + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f4e85a94b..bdceec7d3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.MockConsumer -import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext @@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest { partitionsEndMap[partition] = records topicsCollection.add(partition.topic()) } - val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) mockKafkaConsumer.subscribe(topicsCollection) mockKafkaConsumer.rebalance(partitions) mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", - "I am message $i") + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) mockKafkaConsumer.addRecord(record) } @@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaBasicAuthConsumerWithDynamicFunction() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList<TopicPartition> = arrayListOf() + val topicsCollection: MutableList<String> = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf() + val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + /** Test Consumer Function implementation */ + val consumerFunction = object : KafkaConsumerRecordsFunction { + override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + val count = consumerRecords.count() + log.trace("Received Message count($count)") + } + } + spyBlueprintMessageConsumerService.consume(consumerFunction) + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ //@Test fun testKafkaIntegration() { @@ -131,7 +179,10 @@ open class BlueprintMessageConsumerServiceTest { launch { repeat(5) { delay(100) - blueprintMessageProducerService.sendMessage("this is my message($it)") + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) } } delay(5000) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 31bcc1517..f23624f7a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,18 +20,18 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.RecordMetadata import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.springframework.beans.factory.annotation.Autowired -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.SendResult import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner -import org.springframework.util.concurrent.SettableListenableFuture +import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue @@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest { val blueprintMessageProducerService = bluePrintMessageLibPropertyService .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() + val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() - val future = SettableListenableFuture<SendResult<String, Any>>() - //future.setException(BluePrintException("failed sending")) + val responseMock = mockk<Future<RecordMetadata>>() + every { responseMock.get() } returns mockk() - every { mockKafkaTemplate.send(any(), any()) } returns future + every { mockKafkaTemplate.send(any(), any()) } returns responseMock val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt new file mode 100644 index 000000000..82e40efd1 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.slf4j.MDC +import kotlin.test.assertEquals + +class MessageLoggerServiceTest { + + + @Test + fun testMessagingHeaders() { + val messageLoggerService = MessageLoggerService() + val commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + + val consumerRecord = mockk<ConsumerRecord<*, *>>() + every { consumerRecord.headers() } returns null + every { consumerRecord.key() } returns "1234" + every { consumerRecord.offset() } returns 12345 + every { consumerRecord.topic() } returns "sample-topic" + every { consumerRecord.timestamp() } returns System.currentTimeMillis() + messageLoggerService.messageConsuming(commonHeader, consumerRecord) + assertEquals(commonHeader.requestId, MDC.get("RequestID")) + assertEquals(commonHeader.subRequestId, MDC.get("InvocationID")) + + val mockHeaders = RecordHeaders() + messageLoggerService.messageProducing(mockHeaders) + val map = mockHeaders.toMap() + assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) + + messageLoggerService.messageConsumingExisting() + + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 3868440c7..820041f74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -1,5 +1,6 @@ <!-- ~ Copyright © 2019 IBM. + ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -15,11 +16,18 @@ --> <configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <property name="testing" + value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> + <pattern>${localPattern}</pattern> </encoder> </appender> |