From 0d3a0223fd11d431497519f3f9da640aafe00460 Mon Sep 17 00:00:00 2001 From: Brinda Santh Date: Mon, 28 Oct 2019 16:46:47 -0400 Subject: Add Message tracing logger service. Issue-ID: CCSDK-1046 Signed-off-by: Brinda Santh Change-Id: I509df82ec558cd96934043a5b41ea53aa040cc81 --- .../message/BluePrintMessageExtensions.kt | 32 ++++++++ .../KafkaBasicAuthMessageConsumerService.kt | 4 +- .../KafkaBasicAuthMessageProducerService.kt | 7 +- .../message/service/MessageLoggerService.kt | 88 ++++++++++++++++++++++ 4 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt create mode 100644 ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin') diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..a817c0c74 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + + +fun T?.toMap(): MutableMap { + val map: MutableMap = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +} \ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b99be0ae5..a4ccfa901 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { + val log = logger(KafkaBasicAuthMessageConsumerService::class) + val channel = Channel() var kafkaConsumer: Consumer? = null - val log = logger(KafkaBasicAuthMessageConsumerService::class) @Volatile var keepGoing = true diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 86c04f6be..42adcd712 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService( private var kafkaProducer: KafkaProducer? = null + private val messageLoggerService = MessageLoggerService() + override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } return sendMessageNB(messageProducerProperties.topic!!, message) @@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService( } val record = ProducerRecord(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) headers?.let { - headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) } + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } val callback = Callback { metadata, exception -> log.info("message published offset(${metadata.offset()}, headers :$headers )") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt new file mode 100644 index 000000000..21bf1b76c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class MessageLoggerService { + + private val log = logger(MessageLoggerService::class) + + fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { + messageConsuming(headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord) + } + + fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + messageConsuming(requestID, invocationID, partnerName, consumerRecord) + } + + + fun messageConsuming(requestID: String, invocationID: String, partnerName: String, + consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val localhost = InetAddress.getLocalHost() + MDC.put("InvokeTimestamp", ZonedDateTime + .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", consumerRecord.topic()) + // Custom MDC for Message Consumers + MDC.put("Offset", consumerRecord.offset().toString()) + MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty()) + log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + /** Used before producing message request, Inbound Invocation ID is used as request Id + * for produced message Request, If invocation Id is missing then default Request Id will be generated. + */ + fun messageProducing(requestHeader: Headers) { + val localhost = InetAddress.getLocalHost() + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName) + requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) + } + + fun messageConsumingExisting() { + MDC.clear() + } +} \ No newline at end of file -- cgit 1.2.3-korg