diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds')
3 files changed, 23 insertions, 18 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt index a2101856a..c722173ec 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt @@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintCoreConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toJava +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.mdcGrpcCoroutineScope import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput @@ -58,7 +59,9 @@ open class BluePrintProcessingGRPCHandler( try { ph.register() runBlocking { - executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + mdcGrpcCoroutineScope(executionServiceInput) { + executionServiceHandler.process(executionServiceInput.toJava(), responseObserver) + } } } catch (e: Exception) { if (e is BluePrintProcessorException) handleWithErrorCatalog(e) else handleError(e) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index ed9b4d57e..0b5d568d4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.mdcKafkaCoroutineScope import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType @@ -110,16 +111,18 @@ open class BluePrintProcessingKafkaConsumer( val key = message.key() ?: UUID.randomUUID().toString() val value = String(message.value(), Charset.defaultCharset()) val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() - log.info( - "Consumed Message : topic(${message.topic()}) " + - "partition(${message.partition()}) " + - "leaderEpoch(${message.leaderEpoch().get()}) " + - "offset(${message.offset()}) " + - "key(${message.key()}) " + - BlueprintMessageUtils.getMessageLogData(executionServiceInput) - ) - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(key, executionServiceOutput) + mdcKafkaCoroutineScope(executionServiceInput) { + log.info( + "Consumed Message : topic(${message.topic()}) " + + "partition(${message.partition()}) " + + "leaderEpoch(${message.leaderEpoch().get()}) " + + "offset(${message.offset()}) " + + "key(${message.key()}) " + + BlueprintMessageUtils.getMessageLogData(executionServiceInput) + ) + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) + } } catch (e: Exception) { meterRegistry.counter( BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER, diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index c2c7a60e9..38d2b9030 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -20,8 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import io.grpc.stub.StreamObserver import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC @@ -60,17 +59,17 @@ class ExecutionServiceHandler( suspend fun process( executionServiceInput: ExecutionServiceInput, responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput> - ) { - when { - executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> { - GlobalScope.launch(Dispatchers.Default) { + ) = coroutineScope { + when (executionServiceInput.actionIdentifiers.mode) { + ACTION_MODE_ASYNC -> { + launch { val executionServiceOutput = doProcess(executionServiceInput) responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() } responseObserver.onNext(response(executionServiceInput).toProto()) } - executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> { + ACTION_MODE_SYNC -> { val executionServiceOutput = doProcess(executionServiceInput) responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() |