summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor')
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt19
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt5
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt23
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt13
7 files changed, 67 insertions, 18 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
index 732f4fecf..987ac447a 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
@@ -20,6 +20,10 @@ import io.grpc.Grpc
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.newCoroutineContext
+import kotlinx.coroutines.withContext
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
@@ -28,9 +32,11 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVO
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
import org.slf4j.MDC
import java.net.InetAddress
import java.net.InetSocketAddress
@@ -108,3 +114,16 @@ class GrpcLoggerService {
}
}
}
+suspend fun <T> mdcGrpcCoroutineScope(
+ executionServiceInput: ExecutionServiceInput,
+ block: suspend CoroutineScope.() -> T
+) = coroutineScope {
+
+ MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+ MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+ MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
+
+ withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) {
+ block()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index cccc61f40..e5064957d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtil
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.slf4j.LoggerFactory
+import org.slf4j.MDC
import java.nio.charset.Charset
class KafkaMessageProducerService(
@@ -78,7 +79,9 @@ class KafkaMessageProducerService(
headers?.let {
headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
+ val context = MDC.getCopyOfContextMap()
val callback = Callback { metadata, exception ->
+ MDC.setContextMap(context)
meterRegistry.counter(
BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
BlueprintMessageUtils.kafkaMetricTag(topic)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
index 90b850017..ff2d43fcd 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -16,12 +16,18 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.newCoroutineContext
+import kotlinx.coroutines.withContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.Headers
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -93,3 +99,17 @@ class MessageLoggerService {
MDC.clear()
}
}
+
+suspend fun <T> mdcKafkaCoroutineScope(
+ executionServiceInput: ExecutionServiceInput,
+ block: suspend CoroutineScope.() -> T
+) = coroutineScope {
+
+ MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+ MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+ MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
+
+ withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) {
+ block()
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
index b1d8abd16..4b9bbb19f 100644
--- a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
@@ -126,6 +126,8 @@ suspend fun <T> mdcWebCoroutineScope(
val mdcContext = if (executionServiceInput != null) {
// MDC Context with overridden request ID
MDC.put("RequestID", executionServiceInput.commonHeader.requestId)
+ MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId)
+ MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId)
MDCContext(MDC.getCopyOfContextMap())
} else {
// Default MDC Context
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
index a2101856a..c722173ec 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver
import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintCoreConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toJava
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.mdcGrpcCoroutineScope
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
@@ -58,7 +59,9 @@ open class BluePrintProcessingGRPCHandler(
try {
ph.register()
runBlocking {
- executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
+ mdcGrpcCoroutineScope(executionServiceInput) {
+ executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
+ }
}
} catch (e: Exception) {
if (e is BluePrintProcessorException) handleWithErrorCatalog(e) else handleError(e)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index ed9b4d57e..0b5d568d4 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope
import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
@@ -110,16 +111,18 @@ open class BluePrintProcessingKafkaConsumer(
val key = message.key() ?: UUID.randomUUID().toString()
val value = String(message.value(), Charset.defaultCharset())
val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
- log.info(
- "Consumed Message : topic(${message.topic()}) " +
- "partition(${message.partition()}) " +
- "leaderEpoch(${message.leaderEpoch().get()}) " +
- "offset(${message.offset()}) " +
- "key(${message.key()}) " +
- BlueprintMessageUtils.getMessageLogData(executionServiceInput)
- )
- val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
+ mdcKafkaCoroutineScope(executionServiceInput) {
+ log.info(
+ "Consumed Message : topic(${message.topic()}) " +
+ "partition(${message.partition()}) " +
+ "leaderEpoch(${message.leaderEpoch().get()}) " +
+ "offset(${message.offset()}) " +
+ "key(${message.key()}) " +
+ BlueprintMessageUtils.getMessageLogData(executionServiceInput)
+ )
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+ blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
+ }
} catch (e: Exception) {
meterRegistry.counter(
BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
index c2c7a60e9..38d2b9030 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
@@ -20,8 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import io.grpc.stub.StreamObserver
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
@@ -60,17 +59,17 @@ class ExecutionServiceHandler(
suspend fun process(
executionServiceInput: ExecutionServiceInput,
responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
- ) {
- when {
- executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
- GlobalScope.launch(Dispatchers.Default) {
+ ) = coroutineScope {
+ when (executionServiceInput.actionIdentifiers.mode) {
+ ACTION_MODE_ASYNC -> {
+ launch {
val executionServiceOutput = doProcess(executionServiceInput)
responseObserver.onNext(executionServiceOutput.toProto())
responseObserver.onCompleted()
}
responseObserver.onNext(response(executionServiceInput).toProto())
}
- executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
+ ACTION_MODE_SYNC -> {
val executionServiceOutput = doProcess(executionServiceInput)
responseObserver.onNext(executionServiceOutput.toProto())
responseObserver.onCompleted()