summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-10-31 12:59:17 +0000
committerGerrit Code Review <gerrit@onap.org>2019-10-31 12:59:17 +0000
commitbfad46ffc8078b6fe2c8ea382daf9f8f795be4b7 (patch)
tree9cd8e4d822bec01f901af1ff50c8680c47ea00af /ms/blueprintsprocessor/modules
parentb3f56638d438107a201466c1956240805f100889 (diff)
parent0d3a0223fd11d431497519f3f9da640aafe00460 (diff)
Merge "Add Message tracing logger service."
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt88
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt61
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml10
-rw-r--r--ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt4
8 files changed, 208 insertions, 7 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
index 1b932480a..6d2ba43cc 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
@@ -31,6 +31,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.slf4j.MDC
+import java.net.InetAddress
import java.net.InetSocketAddress
import java.time.ZoneOffset
import java.time.ZonedDateTime
@@ -46,7 +47,7 @@ class GrpcLoggerService {
headers: Metadata, next: ServerCallHandler<ReqT, RespT>) {
val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID()
val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID()
- val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID()
+ val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN"
grpcRequesting(requestID, invocationID, partnerName, call)
}
@@ -54,12 +55,14 @@ class GrpcLoggerService {
headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) {
val requestID = headers.requestId.defaultToUUID()
val invocationID = headers.subRequestId.defaultToUUID()
- val partnerName = headers.originatorId.defaultToUUID()
+ val partnerName = headers.originatorId ?: "UNKNOWN"
grpcRequesting(requestID, invocationID, partnerName, call)
}
fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String,
call: ServerCall<ReqT, RespT>) {
+ val localhost = InetAddress.getLocalHost()
+
val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress
?: throw BluePrintProcessorException("failed to get client address")
val serviceName = call.methodDescriptor.fullMethodName
@@ -69,7 +72,7 @@ class GrpcLoggerService {
MDC.put("InvocationID", invocationID)
MDC.put("PartnerName", partnerName)
MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty())
- MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
MDC.put("ServiceName", serviceName)
log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
new file mode 100644
index 000000000..a817c0c74
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeader
+import java.nio.charset.Charset
+
+
+fun <T : Headers> T?.toMap(): MutableMap<String, String> {
+ val map: MutableMap<String, String> = hashMapOf()
+ this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
+ return map
+}
+
+fun Headers.addHeader(key: String, value: String) {
+ this.add(RecordHeader(key, value.toByteArray()))
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index b99be0ae5..a4ccfa901 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
+ val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
- val log = logger(KafkaBasicAuthMessageConsumerService::class)
@Volatile
var keepGoing = true
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 86c04f6be..42adcd712 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService(
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+ private val messageLoggerService = MessageLoggerService()
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService(
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
headers?.let {
- headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
log.info("message published offset(${metadata.offset()}, headers :$headers )")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
new file mode 100644
index 000000000..21bf1b76c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Headers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.time.Instant
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class MessageLoggerService {
+
+ private val log = logger(MessageLoggerService::class)
+
+ fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
+ messageConsuming(headers.requestId, headers.subRequestId,
+ headers.originatorId, consumerRecord)
+ }
+
+ fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
+ val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
+ val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
+ messageConsuming(requestID, invocationID, partnerName, consumerRecord)
+ }
+
+
+ fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
+ consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val localhost = InetAddress.getLocalHost()
+ MDC.put("InvokeTimestamp", ZonedDateTime
+ .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
+ .format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+ MDC.put("ServiceName", consumerRecord.topic())
+ // Custom MDC for Message Consumers
+ MDC.put("Offset", consumerRecord.offset().toString())
+ MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
+ log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+ /** Used before producing message request, Inbound Invocation ID is used as request Id
+ * for produced message Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun messageProducing(requestHeader: Headers) {
+ val localhost = InetAddress.getLocalHost()
+ requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
+ requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
+ }
+
+ fun messageConsumingExisting() {
+ MDC.clear()
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
new file mode 100644
index 000000000..82e40efd1
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.slf4j.MDC
+import kotlin.test.assertEquals
+
+class MessageLoggerServiceTest {
+
+
+ @Test
+ fun testMessagingHeaders() {
+ val messageLoggerService = MessageLoggerService()
+ val commonHeader = CommonHeader().apply {
+ requestId = "1234"
+ subRequestId = "1234-12"
+ originatorId = "cds-test"
+ }
+
+ val consumerRecord = mockk<ConsumerRecord<*, *>>()
+ every { consumerRecord.headers() } returns null
+ every { consumerRecord.key() } returns "1234"
+ every { consumerRecord.offset() } returns 12345
+ every { consumerRecord.topic() } returns "sample-topic"
+ every { consumerRecord.timestamp() } returns System.currentTimeMillis()
+ messageLoggerService.messageConsuming(commonHeader, consumerRecord)
+ assertEquals(commonHeader.requestId, MDC.get("RequestID"))
+ assertEquals(commonHeader.subRequestId, MDC.get("InvocationID"))
+
+ val mockHeaders = RecordHeaders()
+ messageLoggerService.messageProducing(mockHeaders)
+ val map = mockHeaders.toMap()
+ assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID])
+
+ messageLoggerService.messageConsumingExisting()
+
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
index 3868440c7..5f4fb4f73 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
@@ -1,5 +1,6 @@
<!--
~ Copyright © 2019 IBM.
+ ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -15,11 +16,18 @@
-->
<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+ <pattern>${testing}</pattern>
</encoder>
</appender>
diff --git a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
index 2ef5a31bc..cec11ae3c 100644
--- a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
@@ -32,6 +32,7 @@ import org.springframework.http.server.reactive.ServerHttpResponse
import reactor.core.Disposable
import reactor.core.publisher.Mono
import reactor.core.publisher.MonoSink
+import java.net.InetAddress
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
@@ -43,6 +44,7 @@ class RestLoggerService {
fun entering(request: ServerHttpRequest) {
+ val localhost = InetAddress.getLocalHost()
val headers = request.headers
val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID()
val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID()
@@ -52,7 +54,7 @@ class RestLoggerService {
MDC.put("InvocationID", invocationID)
MDC.put("PartnerName", partnerName)
MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty())
- MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty())
+ MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty())
if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
MDC.put("ServiceName", request.uri.path)
}