diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
7 files changed, 305 insertions, 69 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt new file mode 100644 index 000000000..55cf0941d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt @@ -0,0 +1,27 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc + +import io.grpc.Metadata + +fun Metadata.getStringKey(key: String): String? { + return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)) +} + +fun Metadata.putStringKeyValue(key: String, value: String) { + this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt new file mode 100644 index 000000000..f3b14b59f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +class GrpcClientLoggingInterceptor : ClientInterceptor { + val log = logger(GrpcClientLoggingInterceptor::class) + + val loggingService = GrpcLoggerService() + + override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>, + callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> { + + return object : ForwardingClientCall + .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) { + + override fun start(responseListener: Listener<RespT>, headers: Metadata) { + val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) { + override fun onMessage(message: RespT) { + loggingService.grpcInvoking(headers) + super.onMessage(message) + } + } + super.start(listener, headers) + } + } + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt new file mode 100644 index 000000000..e21d5d3ce --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt @@ -0,0 +1,92 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.slf4j.MDC + +class GrpcServerLoggingInterceptor : ServerInterceptor { + val log = logger(GrpcServerLoggingInterceptor::class) + val loggingService = GrpcLoggerService() + + override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>, + requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>) + : ServerCall.Listener<ReqT> { + + val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { + override fun sendHeaders(responseHeaders: Metadata) { + loggingService.grpResponding(requestHeaders, responseHeaders) + super.sendHeaders(responseHeaders) + } + } + + return object + : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( + next.startCall(forwardingServerCall, requestHeaders)) { + + override fun onMessage(message: ReqT) { + /** Get the requestId, SubRequestId and Originator Id and set in MDS context + * If you are using other GRPC services, Implement own Logging Interceptors to get tracing. + * */ + when (message) { + is ExecutionServiceInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintUploadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintDownloadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintRemoveInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + else -> { + loggingService.grpcRequesting(call, requestHeaders, next) + } + } + super.onMessage(message) + } + + override fun onComplete() { + MDC.clear() + super.onComplete() + } + + override fun onCancel() { + MDC.clear() + super.onCancel() + } + + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt new file mode 100644 index 000000000..1b932480a --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt @@ -0,0 +1,98 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.Grpc +import io.grpc.Metadata +import io.grpc.ServerCall +import io.grpc.ServerCallHandler +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetSocketAddress +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class GrpcLoggerService { + + private val log = logger(GrpcLoggerService::class) + + /** Used when server receives request */ + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: Metadata, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID() + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.requestId.defaultToUUID() + val invocationID = headers.subRequestId.defaultToUUID() + val partnerName = headers.originatorId.defaultToUUID() + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String, + call: ServerCall<ReqT, RespT>) { + val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress + ?: throw BluePrintProcessorException("failed to get client address") + val serviceName = call.methodDescriptor.fullMethodName + + MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty()) + MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty()) + MDC.put("ServiceName", serviceName) + log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + + /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id + * for outbound Request, If invocation Id is missing then default Request Id will be generated. + */ + fun grpcInvoking(requestHeader: Metadata) { + requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName) + } + + /** Used when server returns response */ + fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) { + try { + responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty()) + } catch (e: Exception) { + log.warn("couldn't set grpc response headers", e) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt index dbff84211..601dc0e33 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt @@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider import io.grpc.internal.PickFirstLoadBalancerProvider import io.grpc.netty.NettyChannelBuilder import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties) : BluePrintGrpcClientService { @@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke .forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port) .nameResolverFactory(DnsNameResolverProvider()) .loadBalancerFactory(PickFirstLoadBalancerProvider()) + .intercept(GrpcClientLoggingInterceptor()) .intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build() return managedChannel } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt deleted file mode 100644 index cdf6ce195..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactor.ReactorContext -import kotlinx.coroutines.reactor.asCoroutineContext -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine -import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext -import reactor.core.publisher.Mono -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ -@UseExperimental(InternalCoroutinesApi::class) -fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> - - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) - ?: sink.currentContext()).asCoroutineContext() - /** Populate MDC context only if present in Reactor Context */ - val newContext = if (!reactorContext.context.isEmpty - && reactorContext.context.hasKey(MDCContext)) { - val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) - GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) - } else GlobalScope.newCoroutineContext(context + reactorContext) - - val coroutine = MonoMDCCoroutine(newContext, sink) - sink.onDispose(coroutine) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt index 4da7dcd0e..2ef5a31bc 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt @@ -14,43 +14,45 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.core.service +package org.onap.ccsdk.cds.blueprintsprocessor.rest.service -import kotlinx.coroutines.AbstractCoroutine -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.handleCoroutineException +import kotlinx.coroutines.* +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.asCoroutineContext +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.slf4j.MDC import org.springframework.http.server.reactive.ServerHttpRequest import org.springframework.http.server.reactive.ServerHttpResponse import reactor.core.Disposable +import reactor.core.publisher.Mono import reactor.core.publisher.MonoSink import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter -import java.util.* import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext -class LoggingService { - private val log = logger(LoggingService::class) +class RestLoggerService { + private val log = logger(RestLoggerService::class) - companion object { - const val ONAP_REQUEST_ID = "X-ONAP-RequestID" - const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID" - const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName" - } fun entering(request: ServerHttpRequest) { val headers = request.headers - val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID)) - val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID)) - val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME)) + val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty() MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) MDC.put("RequestID", requestID) MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) - MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress)) - MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString)) + MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty()) + MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty()) if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) { MDC.put("ServiceName", request.uri.path) } @@ -62,22 +64,35 @@ class LoggingService { val resHeaders = response.headers resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID") resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID") + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + resHeaders[ONAP_PARTNER_NAME] = partnerName } catch (e: Exception) { log.warn("couldn't set response headers", e) } finally { MDC.clear() } } +} - private fun defaultToEmpty(input: Any?): String { - return input?.toString() ?: "" - } - private fun defaultToUUID(input: String?): String { - return input ?: UUID.randomUUID().toString() - } -} +/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ +@UseExperimental(InternalCoroutinesApi::class) +fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> + + val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) + ?: sink.currentContext()).asCoroutineContext() + /** Populate MDC context only if present in Reactor Context */ + val newContext = if (!reactorContext.context.isEmpty + && reactorContext.context.hasKey(MDCContext)) { + val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) + GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) + } else GlobalScope.newCoroutineContext(context + reactorContext) + val coroutine = MonoMDCCoroutine(newContext, sink) + sink.onDispose(coroutine) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) +} @InternalCoroutinesApi class MonoMDCCoroutine<in T>( |