diff options
Diffstat (limited to 'ms/blueprintsprocessor')
7 files changed, 67 insertions, 18 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt index 732f4fecf..987ac447a 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt @@ -20,6 +20,10 @@ import io.grpc.Grpc import io.grpc.Metadata import io.grpc.ServerCall import io.grpc.ServerCallHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.newCoroutineContext +import kotlinx.coroutines.withContext import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader @@ -28,9 +32,11 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVO import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput import org.slf4j.MDC import java.net.InetAddress import java.net.InetSocketAddress @@ -108,3 +114,16 @@ class GrpcLoggerService { } } } +suspend fun <T> mdcGrpcCoroutineScope( + executionServiceInput: ExecutionServiceInput, + block: suspend CoroutineScope.() -> T +) = coroutineScope { + + MDC.put("RequestID", executionServiceInput.commonHeader.requestId) + MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId) + MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId) + + withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) { + block() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index cccc61f40..e5064957d 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -32,6 +32,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtil import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString import org.slf4j.LoggerFactory +import org.slf4j.MDC import java.nio.charset.Charset class KafkaMessageProducerService( @@ -78,7 +79,9 @@ class KafkaMessageProducerService( headers?.let { headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } } + val context = MDC.getCopyOfContextMap() val callback = Callback { metadata, exception -> + MDC.setContextMap(context) meterRegistry.counter( BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER, BlueprintMessageUtils.kafkaMetricTag(topic) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt index 90b850017..ff2d43fcd 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -16,12 +16,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.newCoroutineContext +import kotlinx.coroutines.withContext import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.Headers import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger @@ -93,3 +99,17 @@ class MessageLoggerService { MDC.clear() } } + +suspend fun <T> mdcKafkaCoroutineScope( + executionServiceInput: ExecutionServiceInput, + block: suspend CoroutineScope.() -> T +) = coroutineScope { + + MDC.put("RequestID", executionServiceInput.commonHeader.requestId) + MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId) + MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId) + + withContext(newCoroutineContext(this.coroutineContext + MDCContext(MDC.getCopyOfContextMap()))) { + block() + } +} diff --git a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt index b1d8abd16..4b9bbb19f 100644 --- a/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt @@ -126,6 +126,8 @@ suspend fun <T> mdcWebCoroutineScope( val mdcContext = if (executionServiceInput != null) { // MDC Context with overridden request ID MDC.put("RequestID", executionServiceInput.commonHeader.requestId) + MDC.put("SubRequestID", executionServiceInput.commonHeader.subRequestId) + MDC.put("OriginatorID", executionServiceInput.commonHeader.originatorId) MDCContext(MDC.getCopyOfContextMap()) } else { // Default MDC Context diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index a2101856a..c722173ec 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintCoreConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toJava +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.mdcGrpcCoroutineScope import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput @@ -58,7 +59,9 @@ open class BluePrintProcessingGRPCHandler( try { ph.register() runBlocking { - executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + mdcGrpcCoroutineScope(executionServiceInput) { + executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + } } } catch (e: Exception) { if (e is BluePrintProcessorException) handleWithErrorCatalog(e) else handleError(e) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index ed9b4d57e..0b5d568d4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType @@ -110,16 +111,18 @@ open class BluePrintProcessingKafkaConsumer( val key = message.key() ?: UUID.randomUUID().toString() val value = String(message.value(), Charset.defaultCharset()) val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() - log.info( - "Consumed Message : topic(${message.topic()}) " + - "partition(${message.partition()}) " + - "leaderEpoch(${message.leaderEpoch().get()}) " + - "offset(${message.offset()}) " + - "key(${message.key()}) " + - BlueprintMessageUtils.getMessageLogData(executionServiceInput) - ) - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(key, executionServiceOutput) + mdcKafkaCoroutineScope(executionServiceInput) { + log.info( + "Consumed Message : topic(${message.topic()}) " + + "partition(${message.partition()}) " + + "leaderEpoch(${message.leaderEpoch().get()}) " + + "offset(${message.offset()}) " + + "key(${message.key()}) " + + BlueprintMessageUtils.getMessageLogData(executionServiceInput) + ) + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) + } } catch (e: Exception) { meterRegistry.counter( BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index c2c7a60e9..38d2b9030 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -20,8 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import io.grpc.stub.StreamObserver import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC @@ -60,17 +59,17 @@ class ExecutionServiceHandler( suspend fun process( executionServiceInput: ExecutionServiceInput, responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput> - ) { - when { - executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> { - GlobalScope.launch(Dispatchers.Default) { + ) = coroutineScope { + when (executionServiceInput.actionIdentifiers.mode) { + ACTION_MODE_ASYNC -> { + launch { val executionServiceOutput = doProcess(executionServiceInput) responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() } responseObserver.onNext(response(executionServiceInput).toProto()) } - executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> { + ACTION_MODE_SYNC -> { val executionServiceOutput = doProcess(executionServiceInput) responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() |