summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
diff options
context:
space:
mode:
authorBrinda Santh <bs2796@att.com>2019-10-28 16:46:47 -0400
committerBrinda Santh <bs2796@att.com>2019-10-28 16:46:47 -0400
commit0d3a0223fd11d431497519f3f9da640aafe00460 (patch)
tree8f211b811e86ebf72d4873957624ded94206d7c9 /ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
parent6885f5811c515d00c049c87de0797076dfc3686e (diff)
Add Message tracing logger service.
Issue-ID: CCSDK-1046 Signed-off-by: Brinda Santh <bs2796@att.com> Change-Id: I509df82ec558cd96934043a5b41ea53aa040cc81
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt88
4 files changed, 129 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
new file mode 100644
index 000000000..a817c0c74
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeader
+import java.nio.charset.Charset
+
+
+fun <T : Headers> T?.toMap(): MutableMap<String, String> {
+ val map: MutableMap<String, String> = hashMapOf()
+ this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
+ return map
+}
+
+fun Headers.addHeader(key: String, value: String) {
+ this.add(RecordHeader(key, value.toByteArray()))
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index b99be0ae5..a4ccfa901 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
+ val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
- val log = logger(KafkaBasicAuthMessageConsumerService::class)
@Volatile
var keepGoing = true
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 86c04f6be..42adcd712 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService(
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+ private val messageLoggerService = MessageLoggerService()
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService(
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
headers?.let {
- headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
log.info("message published offset(${metadata.offset()}, headers :$headers )")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
new file mode 100644
index 000000000..21bf1b76c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Headers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.time.Instant
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class MessageLoggerService {
+
+ private val log = logger(MessageLoggerService::class)
+
+ fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
+ messageConsuming(headers.requestId, headers.subRequestId,
+ headers.originatorId, consumerRecord)
+ }
+
+ fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
+ val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
+ val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
+ messageConsuming(requestID, invocationID, partnerName, consumerRecord)
+ }
+
+
+ fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
+ consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val localhost = InetAddress.getLocalHost()
+ MDC.put("InvokeTimestamp", ZonedDateTime
+ .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
+ .format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+ MDC.put("ServiceName", consumerRecord.topic())
+ // Custom MDC for Message Consumers
+ MDC.put("Offset", consumerRecord.offset().toString())
+ MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
+ log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+ /** Used before producing message request, Inbound Invocation ID is used as request Id
+ * for produced message Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun messageProducing(requestHeader: Headers) {
+ val localhost = InetAddress.getLocalHost()
+ requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
+ requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
+ }
+
+ fun messageConsumingExisting() {
+ MDC.clear()
+ }
+} \ No newline at end of file