summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt45
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt109
5 files changed, 163 insertions, 2 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index ab04054fe..1cd8a2af7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
var clientId: String? = null
var topic: String? = null
var pollMillSec: Long = 1000
+ var pollRecords: Int = -1
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 5a9e61bfd..b5d444a49 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
+ /** To handle Back pressure, Get only configured record for processing */
+ if (messageConsumerProperties.pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+ }
// TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.info("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 2b84eaa78..f4e85a94b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -52,6 +52,7 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
"blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
@@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest {
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
- delay(1000)
+ delay(100)
blueprintMessageProducerService.sendMessage("this is my message($it)")
}
}
- delay(10000)
+ delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
new file mode 100644
index 000000000..cdf6ce195
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactor.ReactorContext
+import kotlinx.coroutines.reactor.asCoroutineContext
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import reactor.core.publisher.Mono
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
+@UseExperimental(InternalCoroutinesApi::class)
+fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
+
+ val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
+ ?: sink.currentContext()).asCoroutineContext()
+ /** Populate MDC context only if present in Reactor Context */
+ val newContext = if (!reactorContext.context.isEmpty
+ && reactorContext.context.hasKey(MDCContext)) {
+ val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
+ GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
+ } else GlobalScope.newCoroutineContext(context + reactorContext)
+
+ val coroutine = MonoMDCCoroutine(newContext, sink)
+ sink.onDispose(coroutine)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
new file mode 100644
index 000000000..4da7dcd0e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+
+import kotlinx.coroutines.AbstractCoroutine
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.handleCoroutineException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import org.springframework.http.server.reactive.ServerHttpRequest
+import org.springframework.http.server.reactive.ServerHttpResponse
+import reactor.core.Disposable
+import reactor.core.publisher.MonoSink
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+
+class LoggingService {
+ private val log = logger(LoggingService::class)
+
+ companion object {
+ const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
+ const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
+ const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
+ }
+
+ fun entering(request: ServerHttpRequest) {
+ val headers = request.headers
+ val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID))
+ val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID))
+ val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME))
+ MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress))
+ MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString))
+ if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
+ MDC.put("ServiceName", request.uri.path)
+ }
+ }
+
+ fun exiting(request: ServerHttpRequest, response: ServerHttpResponse) {
+ try {
+ val reqHeaders = request.headers
+ val resHeaders = response.headers
+ resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID")
+ resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID")
+ } catch (e: Exception) {
+ log.warn("couldn't set response headers", e)
+ } finally {
+ MDC.clear()
+ }
+ }
+
+ private fun defaultToEmpty(input: Any?): String {
+ return input?.toString() ?: ""
+ }
+
+ private fun defaultToUUID(input: String?): String {
+ return input ?: UUID.randomUUID().toString()
+ }
+}
+
+
+@InternalCoroutinesApi
+class MonoMDCCoroutine<in T>(
+ parentContext: CoroutineContext,
+ private val sink: MonoSink<T>
+) : AbstractCoroutine<T>(parentContext, true), Disposable {
+ private var disposed = false
+
+ override fun onCompleted(value: T) {
+ if (!disposed) {
+ if (value == null) sink.success() else sink.success(value)
+ }
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ if (!disposed) {
+ sink.error(cause)
+ } else if (!handled) {
+ handleCoroutineException(context, cause)
+ }
+ }
+
+ override fun dispose() {
+ disposed = true
+ cancel()
+ }
+
+ override fun isDisposed(): Boolean = disposed
+}