aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2019-10-31 12:57:09 +0000
committerGerrit Code Review <gerrit@onap.org>2019-10-31 12:57:09 +0000
commit987afaa51413b2b943deb2afdcdc92b89f16fc4c (patch)
tree04e0d3080d766a456b75b74bc999ab9f0b76a3b4 /ms/blueprintsprocessor/modules
parente2619d958f8237f00467da8cacb0fb9fd210a323 (diff)
parent6fd8d3dbd0c6cdd27f9ef975e4a6a45403dfb298 (diff)
Merge "Add GRPC log tracing service."
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt47
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt92
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt98
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt45
-rw-r--r--ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt (renamed from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt)63
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt2
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt14
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt9
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml9
12 files changed, 335 insertions, 75 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt
new file mode 100644
index 000000000..55cf0941d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc
+
+import io.grpc.Metadata
+
+fun Metadata.getStringKey(key: String): String? {
+ return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))
+}
+
+fun Metadata.putStringKeyValue(key: String, value: String) {
+ this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt
new file mode 100644
index 000000000..f3b14b59f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+class GrpcClientLoggingInterceptor : ClientInterceptor {
+ val log = logger(GrpcClientLoggingInterceptor::class)
+
+ val loggingService = GrpcLoggerService()
+
+ override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>,
+ callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> {
+
+ return object : ForwardingClientCall
+ .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+
+ override fun start(responseListener: Listener<RespT>, headers: Metadata) {
+ val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+ override fun onMessage(message: RespT) {
+ loggingService.grpcInvoking(headers)
+ super.onMessage(message)
+ }
+ }
+ super.start(listener, headers)
+ }
+ }
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt
new file mode 100644
index 000000000..e21d5d3ce
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.slf4j.MDC
+
+class GrpcServerLoggingInterceptor : ServerInterceptor {
+ val log = logger(GrpcServerLoggingInterceptor::class)
+ val loggingService = GrpcLoggerService()
+
+ override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>,
+ requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>)
+ : ServerCall.Listener<ReqT> {
+
+ val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
+ override fun sendHeaders(responseHeaders: Metadata) {
+ loggingService.grpResponding(requestHeaders, responseHeaders)
+ super.sendHeaders(responseHeaders)
+ }
+ }
+
+ return object
+ : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
+ next.startCall(forwardingServerCall, requestHeaders)) {
+
+ override fun onMessage(message: ReqT) {
+ /** Get the requestId, SubRequestId and Originator Id and set in MDS context
+ * If you are using other GRPC services, Implement own Logging Interceptors to get tracing.
+ * */
+ when (message) {
+ is ExecutionServiceInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintUploadInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintDownloadInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintRemoveInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ else -> {
+ loggingService.grpcRequesting(call, requestHeaders, next)
+ }
+ }
+ super.onMessage(message)
+ }
+
+ override fun onComplete() {
+ MDC.clear()
+ super.onComplete()
+ }
+
+ override fun onCancel() {
+ MDC.clear()
+ super.onCancel()
+ }
+
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
new file mode 100644
index 000000000..1b932480a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.Grpc
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetSocketAddress
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class GrpcLoggerService {
+
+ private val log = logger(GrpcLoggerService::class)
+
+ /** Used when server receives request */
+ fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+ headers: Metadata, next: ServerCallHandler<ReqT, RespT>) {
+ val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID()
+ val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID()
+ val partnerName = headers.getStringKey(ONAP_PARTNER_NAME).defaultToUUID()
+ grpcRequesting(requestID, invocationID, partnerName, call)
+ }
+
+ fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+ headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) {
+ val requestID = headers.requestId.defaultToUUID()
+ val invocationID = headers.subRequestId.defaultToUUID()
+ val partnerName = headers.originatorId.defaultToUUID()
+ grpcRequesting(requestID, invocationID, partnerName, call)
+ }
+
+ fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String,
+ call: ServerCall<ReqT, RespT>) {
+ val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress
+ ?: throw BluePrintProcessorException("failed to get client address")
+ val serviceName = call.methodDescriptor.fullMethodName
+
+ MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty())
+ MDC.put("ServerFQDN", clientSocketAddress.address.hostName.defaultToEmpty())
+ MDC.put("ServiceName", serviceName)
+ log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+
+ /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id
+ * for outbound Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun grpcInvoking(requestHeader: Metadata) {
+ requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName)
+ }
+
+ /** Used when server returns response */
+ fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) {
+ try {
+ responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty())
+ responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty())
+ responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty())
+ } catch (e: Exception) {
+ log.warn("couldn't set grpc response headers", e)
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
index dbff84211..601dc0e33 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
@@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider
import io.grpc.internal.PickFirstLoadBalancerProvider
import io.grpc.netty.NettyChannelBuilder
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor
class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties)
: BluePrintGrpcClientService {
@@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke
.forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port)
.nameResolverFactory(DnsNameResolverProvider())
.loadBalancerFactory(PickFirstLoadBalancerProvider())
+ .intercept(GrpcClientLoggingInterceptor())
.intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build()
return managedChannel
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
deleted file mode 100644
index cdf6ce195..000000000
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.core
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.reactor.ReactorContext
-import kotlinx.coroutines.reactor.asCoroutineContext
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine
-import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
-import reactor.core.publisher.Mono
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-
-/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
-@UseExperimental(InternalCoroutinesApi::class)
-fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
- block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
-
- val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
- ?: sink.currentContext()).asCoroutineContext()
- /** Populate MDC context only if present in Reactor Context */
- val newContext = if (!reactorContext.context.isEmpty
- && reactorContext.context.hasKey(MDCContext)) {
- val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
- GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
- } else GlobalScope.newCoroutineContext(context + reactorContext)
-
- val coroutine = MonoMDCCoroutine(newContext, sink)
- sink.onDispose(coroutine)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
index 4da7dcd0e..2ef5a31bc 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
+++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
@@ -14,43 +14,45 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+package org.onap.ccsdk.cds.blueprintsprocessor.rest.service
-import kotlinx.coroutines.AbstractCoroutine
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.handleCoroutineException
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactor.ReactorContext
+import kotlinx.coroutines.reactor.asCoroutineContext
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.slf4j.MDC
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import reactor.core.Disposable
+import reactor.core.publisher.Mono
import reactor.core.publisher.MonoSink
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
-import java.util.*
import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
-class LoggingService {
- private val log = logger(LoggingService::class)
+class RestLoggerService {
+ private val log = logger(RestLoggerService::class)
- companion object {
- const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
- const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
- const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
- }
fun entering(request: ServerHttpRequest) {
val headers = request.headers
- val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID))
- val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID))
- val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME))
+ val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID()
+ val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID()
+ val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty()
MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
MDC.put("RequestID", requestID)
MDC.put("InvocationID", invocationID)
MDC.put("PartnerName", partnerName)
- MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress))
- MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString))
+ MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty())
+ MDC.put("ServerFQDN", request.remoteAddress?.hostString.defaultToEmpty())
if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
MDC.put("ServiceName", request.uri.path)
}
@@ -62,22 +64,35 @@ class LoggingService {
val resHeaders = response.headers
resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID")
resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID")
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ resHeaders[ONAP_PARTNER_NAME] = partnerName
} catch (e: Exception) {
log.warn("couldn't set response headers", e)
} finally {
MDC.clear()
}
}
+}
- private fun defaultToEmpty(input: Any?): String {
- return input?.toString() ?: ""
- }
- private fun defaultToUUID(input: String?): String {
- return input ?: UUID.randomUUID().toString()
- }
-}
+/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
+@UseExperimental(InternalCoroutinesApi::class)
+fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
+
+ val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
+ ?: sink.currentContext()).asCoroutineContext()
+ /** Populate MDC context only if present in Reactor Context */
+ val newContext = if (!reactorContext.context.isEmpty
+ && reactorContext.context.hasKey(MDCContext)) {
+ val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
+ GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
+ } else GlobalScope.newCoroutineContext(context + reactorContext)
+ val coroutine = MonoMDCCoroutine(newContext, sink)
+ sink.onDispose(coroutine)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
@InternalCoroutinesApi
class MonoMDCCoroutine<in T>(
diff --git a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
index a6bff7051..bf251f6c3 100644
--- a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
@@ -19,9 +19,9 @@ package org.onap.ccsdk.cds.blueprintsprocessor.designer.api
import io.swagger.annotations.ApiOperation
import io.swagger.annotations.ApiParam
-import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelSearch
import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.handler.BluePrintModelHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.springframework.core.io.Resource
import org.springframework.http.MediaType
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
index f14f61e60..345650686 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
@@ -23,7 +23,7 @@ import io.swagger.annotations.ApiParam
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.logger
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
index e291aa78e..6bffffdb5 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
@@ -18,8 +18,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
@@ -36,8 +40,13 @@ class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintPr
override fun onNext(executionServiceInput: ExecutionServiceInput) {
log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
"subRequestId(${executionServiceInput.commonHeader.subRequestId})")
- responseObserver.onNext(buildNotification(executionServiceInput))
- responseObserver.onNext(buildResponse(executionServiceInput))
+ runBlocking {
+ launch(MDCContext()) {
+ responseObserver.onNext(buildNotification(executionServiceInput))
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ log.info("message has sent successfully...")
+ }
+ }
responseObserver.onCompleted()
}
@@ -85,6 +94,7 @@ fun main() {
try {
val server = ServerBuilder
.forPort(50052)
+ .intercept(GrpcServerLoggingInterceptor())
.addService(MockBluePrintProcessingServer())
.build()
server.start()
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
index 29d24c6ad..9a5be0151 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -16,6 +16,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+import com.google.protobuf.util.JsonFormat
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.testing.GrpcCleanupRule
@@ -26,6 +27,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import org.junit.Rule
import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
@@ -132,12 +134,17 @@ class StreamingRemoteExecutionServiceTest {
.setActionName("SampleScript")
.setBlueprintName("sample-cba")
.setBlueprintVersion("1.0.0")
+ .setMode(ACTION_MODE_SYNC)
.build()
+ val jsonContent = """{ "key1" : "value1" }"""
+ val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
+ JsonFormat.parser().merge(jsonContent, payloadBuilder)
+
return ExecutionServiceInput.newBuilder()
.setCommonHeader(commonHeader)
.setActionIdentifiers(actionIdentifier)
- //.setPayload(payloadBuilder.build())
+ .setPayload(payloadBuilder.build())
.build()
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
index afe10b39d..8951e1a71 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
@@ -15,11 +15,18 @@
-->
<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+ <pattern>${testing}</pattern>
</encoder>
</appender>