aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt25
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt24
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt13
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt25
11 files changed, 81 insertions, 52 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
index a817c0c74..509689eca 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
@@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
import java.nio.charset.Charset
-
fun <T : Headers> T?.toMap(): MutableMap<String, String> {
val map: MutableMap<String, String> = hashMapOf()
this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
@@ -29,4 +28,4 @@ fun <T : Headers> T?.toMap(): MutableMap<String, String> {
fun Headers.addHeader(key: String, value: String) {
this.add(RecordHeader(key, value.toByteArray()))
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index ecffa280f..cc4c7fa4a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -17,7 +17,6 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
-
import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
@@ -36,14 +35,13 @@ open class BluePrintMessageLibConfiguration
* Exposed Dependency Service by this Message Lib Module
*/
fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService =
- instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
+ instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
/** Extension functions for message producer service **/
fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService {
return messageLibPropertyService().blueprintMessageProducerService(selector)
}
-
fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
return messageLibPropertyService().blueprintMessageProducerService(jsonNode)
}
@@ -65,4 +63,4 @@ class MessageLibConstants {
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index d0c3d5ae1..59e3606ea 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.StreamsConfig
/** Producer Properties **/
open class MessageProducerProperties
-
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
lateinit var bootstrapServers: String
var topic: String? = null
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
index 4c6c0acdd..72a70893a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
@@ -30,7 +30,6 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
lateinit var processorContext: ProcessorContext
-
override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) {
try {
processNB(key, value)
@@ -42,7 +41,6 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
override fun init(context: ProcessorContext) {
log.info("initializing processor (${this.javaClass.simpleName})")
this.processorContext = context
-
}
override fun close() {
@@ -54,12 +52,12 @@ abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
/** CDS Kafka Stream Punctuator abstract class to implement */
abstract class AbstractBluePrintMessagePunctuator : Punctuator {
- lateinit var processorContext: ProcessorContext
+ lateinit var processorContext: ProcessorContext
override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) {
punctuateNB(timestamp)
}
abstract suspend fun punctuateNB(timestamp: Long)
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 853b88bc9..44b50af44 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -19,7 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.*
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@@ -62,8 +67,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
}
- private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
- : BlueprintMessageProducerService {
+ private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
+ BlueprintMessageProducerService {
when (MessageProducerProperties) {
is KafkaBasicAuthMessageProducerProperties -> {
@@ -77,7 +82,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageProducerProperties::class.java)
+ prefix, KafkaBasicAuthMessageProducerProperties::class.java
+ )
}
/** Consumer Property Lib Service Implementation **/
@@ -126,8 +132,8 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
}
}
- private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
- : BlueprintMessageConsumerService {
+ private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
+ BlueprintMessageConsumerService {
when (messageConsumerProperties) {
is KafkaBasicAuthMessageConsumerProperties -> {
@@ -144,12 +150,13 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ )
}
private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ )
}
-
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 716fda609..f74abcdb7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -50,21 +50,31 @@ interface BlueprintMessageConsumerService {
}
/** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
- suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
- consumerFunction: ConsumerFunction) {
+ suspend fun consume(
+ topics: List<String>,
+ additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction
+ ) {
throw BluePrintProcessorException("Not Implemented")
}
/** close the channel, consumer and other resources */
suspend fun shutDown()
}
+
/** Consumer dynamic implementation interface */
interface KafkaConsumerRecordsFunction : ConsumerFunction {
- suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
- consumerRecords: ConsumerRecords<*, *>)
+
+ suspend fun invoke(
+ messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>
+ )
}
interface KafkaStreamConsumerFunction : ConsumerFunction {
- suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
- additionalConfig: Map<String, Any>?): Topology
-} \ No newline at end of file
+ suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index 7d8138639..cdc65a1c6 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -47,4 +47,4 @@ interface BlueprintMessageProducerService {
}
suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 757846c81..3415c8d0d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -34,8 +34,9 @@ import java.time.Duration
import kotlin.concurrent.thread
open class KafkaBasicAuthMessageConsumerService(
- private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
- : BlueprintMessageConsumerService {
+ private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
+) :
+ BlueprintMessageConsumerService {
val log = logger(KafkaBasicAuthMessageConsumerService::class)
val channel = Channel<String>()
@@ -76,7 +77,6 @@ open class KafkaBasicAuthMessageConsumerService(
return subscribe(consumerTopic, additionalConfig)
}
-
override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
@@ -124,8 +124,11 @@ open class KafkaBasicAuthMessageConsumerService(
return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
}
- override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
- consumerFunction: ConsumerFunction) {
+ override suspend fun consume(
+ topics: List<String>,
+ additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction
+ ) {
val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index ad9a594b0..8416282af 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -20,7 +20,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.serialization.ByteArraySerializer
@@ -32,8 +37,9 @@ import org.slf4j.LoggerFactory
import java.nio.charset.Charset
class KafkaBasicAuthMessageProducerService(
- private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
- : BlueprintMessageProducerService {
+ private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties
+) :
+ BlueprintMessageProducerService {
private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
@@ -51,8 +57,11 @@ class KafkaBasicAuthMessageProducerService(
return sendMessageNB(messageProducerProperties.topic!!, message, headers)
}
- override suspend fun sendMessageNB(topic: String, message: Any,
- headers: MutableMap<String, String>?): Boolean {
+ override suspend fun sendMessageNB(
+ topic: String,
+ message: Any,
+ headers: MutableMap<String, String>?
+ ): Boolean {
val byteArrayMessage = when (message) {
is String -> message.toByteArray(Charset.defaultCharset())
else -> message.asJsonString().toByteArray(Charset.defaultCharset())
@@ -95,4 +104,3 @@ class KafkaBasicAuthMessageProducerService(
return kafkaProducer!!
}
}
-
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
index d0297df4c..0b353d58b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
@@ -23,10 +23,10 @@ import org.apache.kafka.streams.StreamsConfig
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import java.util.*
+import java.util.Properties
-open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties)
- : BlueprintMessageConsumerService {
+open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) :
+ BlueprintMessageConsumerService {
val log = logger(KafkaStreamsBasicAuthConsumerService::class)
lateinit var kafkaStreams: KafkaStreams
@@ -68,4 +68,4 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope
kafkaStreams.close()
}
}
-} \ No newline at end of file
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
index 21bf1b76c..7ec70d91b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -31,15 +31,17 @@ import java.time.Instant
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
-import java.util.*
+import java.util.UUID
class MessageLoggerService {
private val log = logger(MessageLoggerService::class)
fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
- messageConsuming(headers.requestId, headers.subRequestId,
- headers.originatorId, consumerRecord)
+ messageConsuming(
+ headers.requestId, headers.subRequestId,
+ headers.originatorId, consumerRecord
+ )
}
fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
@@ -50,14 +52,19 @@ class MessageLoggerService {
messageConsuming(requestID, invocationID, partnerName, consumerRecord)
}
-
- fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
- consumerRecord: ConsumerRecord<*, *>) {
+ fun messageConsuming(
+ requestID: String,
+ invocationID: String,
+ partnerName: String,
+ consumerRecord: ConsumerRecord<*, *>
+ ) {
val headers = consumerRecord.headers().toMap()
val localhost = InetAddress.getLocalHost()
- MDC.put("InvokeTimestamp", ZonedDateTime
+ MDC.put(
+ "InvokeTimestamp", ZonedDateTime
.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
- .format(DateTimeFormatter.ISO_INSTANT))
+ .format(DateTimeFormatter.ISO_INSTANT)
+ )
MDC.put("RequestID", requestID)
MDC.put("InvocationID", invocationID)
MDC.put("PartnerName", partnerName)
@@ -85,4 +92,4 @@ class MessageLoggerService {
fun messageConsumingExisting() {
MDC.clear()
}
-} \ No newline at end of file
+}