summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt88
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt61
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml10
6 files changed, 199 insertions, 3 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
new file mode 100644
index 000000000..a817c0c74
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeader
+import java.nio.charset.Charset
+
+
+fun <T : Headers> T?.toMap(): MutableMap<String, String> {
+ val map: MutableMap<String, String> = hashMapOf()
+ this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
+ return map
+}
+
+fun Headers.addHeader(key: String, value: String) {
+ this.add(RecordHeader(key, value.toByteArray()))
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index b99be0ae5..a4ccfa901 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -36,9 +37,10 @@ open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
+ val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
- val log = logger(KafkaBasicAuthMessageConsumerService::class)
@Volatile
var keepGoing = true
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 86c04f6be..42adcd712 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService(
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+ private val messageLoggerService = MessageLoggerService()
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService(
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
headers?.let {
- headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
log.info("message published offset(${metadata.offset()}, headers :$headers )")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
new file mode 100644
index 000000000..21bf1b76c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Headers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.time.Instant
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class MessageLoggerService {
+
+ private val log = logger(MessageLoggerService::class)
+
+ fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
+ messageConsuming(headers.requestId, headers.subRequestId,
+ headers.originatorId, consumerRecord)
+ }
+
+ fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
+ val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
+ val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
+ messageConsuming(requestID, invocationID, partnerName, consumerRecord)
+ }
+
+
+ fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
+ consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val localhost = InetAddress.getLocalHost()
+ MDC.put("InvokeTimestamp", ZonedDateTime
+ .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
+ .format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+ MDC.put("ServiceName", consumerRecord.topic())
+ // Custom MDC for Message Consumers
+ MDC.put("Offset", consumerRecord.offset().toString())
+ MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
+ log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+ /** Used before producing message request, Inbound Invocation ID is used as request Id
+ * for produced message Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun messageProducing(requestHeader: Headers) {
+ val localhost = InetAddress.getLocalHost()
+ requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
+ requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
+ }
+
+ fun messageConsumingExisting() {
+ MDC.clear()
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
new file mode 100644
index 000000000..82e40efd1
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.slf4j.MDC
+import kotlin.test.assertEquals
+
+class MessageLoggerServiceTest {
+
+
+ @Test
+ fun testMessagingHeaders() {
+ val messageLoggerService = MessageLoggerService()
+ val commonHeader = CommonHeader().apply {
+ requestId = "1234"
+ subRequestId = "1234-12"
+ originatorId = "cds-test"
+ }
+
+ val consumerRecord = mockk<ConsumerRecord<*, *>>()
+ every { consumerRecord.headers() } returns null
+ every { consumerRecord.key() } returns "1234"
+ every { consumerRecord.offset() } returns 12345
+ every { consumerRecord.topic() } returns "sample-topic"
+ every { consumerRecord.timestamp() } returns System.currentTimeMillis()
+ messageLoggerService.messageConsuming(commonHeader, consumerRecord)
+ assertEquals(commonHeader.requestId, MDC.get("RequestID"))
+ assertEquals(commonHeader.subRequestId, MDC.get("InvocationID"))
+
+ val mockHeaders = RecordHeaders()
+ messageLoggerService.messageProducing(mockHeaders)
+ val map = mockHeaders.toMap()
+ assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID])
+
+ messageLoggerService.messageConsumingExisting()
+
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
index 3868440c7..5f4fb4f73 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
@@ -1,5 +1,6 @@
<!--
~ Copyright © 2019 IBM.
+ ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -15,11 +16,18 @@
-->
<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+ <pattern>${testing}</pattern>
</encoder>
</appender>