aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml5
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt27
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt26
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt47
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt92
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt70
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt (renamed from ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt)6
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt101
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt54
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt49
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt70
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt109
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt90
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README5
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem27
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem52
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt32
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt34
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt28
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt77
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt74
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt88
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt65
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt14
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt61
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml10
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt45
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt (renamed from ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt)65
32 files changed, 1298 insertions, 155 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml
index 5945e29fd..15cabb260 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright © 2019 IBM.
+ ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -42,5 +43,9 @@
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>processor-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt
new file mode 100644
index 000000000..55cf0941d
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc
+
+import io.grpc.Metadata
+
+fun Metadata.getStringKey(key: String): String? {
+ return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))
+}
+
+fun Metadata.putStringKeyValue(key: String, value: String) {
+ this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt
index 1bef3a0f2..0ec049a3c 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,10 @@
package org.onap.ccsdk.cds.blueprintsprocessor.grpc
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
@@ -23,10 +28,25 @@ import org.springframework.context.annotation.Configuration
@ComponentScan
open class BluePrintGrpcLibConfiguration
+/**
+ * Exposed Dependency Service by this GRPC Lib Module
+ */
+fun BluePrintDependencyService.grpcLibPropertyService(): BluePrintGrpcLibPropertyService =
+ instance(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY)
+
+fun BluePrintDependencyService.grpcClientService(selector: String): BluePrintGrpcClientService {
+ return grpcLibPropertyService().blueprintGrpcClientService(selector)
+}
+
+fun BluePrintDependencyService.grpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService {
+ return grpcLibPropertyService().blueprintGrpcClientService(jsonNode)
+}
+
class GRPCLibConstants {
companion object {
const val SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY = "blueprint-grpc-lib-property-service"
const val TYPE_TOKEN_AUTH = "token-auth"
const val TYPE_BASIC_AUTH = "basic-auth"
+ const val TYPE_TLS_AUTH = "tls-auth"
}
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt
index 76e60bd0d..47d16fbc7 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +17,24 @@
package org.onap.ccsdk.cds.blueprintsprocessor.grpc
+/** GRPC Server Properties */
+open class GrpcServerProperties {
+ lateinit var type: String
+ var port: Int = -1
+}
+
+open class TokenAuthGrpcServerProperties : GrpcServerProperties() {
+ lateinit var token: String
+}
+
+open class TLSAuthGrpcServerProperties : GrpcServerProperties() {
+ lateinit var certChain: String
+ lateinit var privateKey: String
+ /** Below Used only for Mutual TLS */
+ var trustCertCollection: String? = null
+}
+
+/** GRPC Client Properties */
open class GrpcClientProperties {
lateinit var type: String
lateinit var host: String
@@ -26,6 +45,13 @@ open class TokenAuthGrpcClientProperties : GrpcClientProperties() {
lateinit var token: String
}
+open class TLSAuthGrpcClientProperties : GrpcClientProperties() {
+ var trustCertCollection: String? = null
+ /** Below Used only for Mutual TLS */
+ var clientCertChain: String? = null
+ var clientPrivateKey: String? = null
+}
+
open class BasicAuthGrpcClientProperties : GrpcClientProperties() {
lateinit var username: String
lateinit var password: String
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt
new file mode 100644
index 000000000..f3b14b59f
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+class GrpcClientLoggingInterceptor : ClientInterceptor {
+ val log = logger(GrpcClientLoggingInterceptor::class)
+
+ val loggingService = GrpcLoggerService()
+
+ override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>,
+ callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> {
+
+ return object : ForwardingClientCall
+ .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+
+ override fun start(responseListener: Listener<RespT>, headers: Metadata) {
+ val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+ override fun onMessage(message: RespT) {
+ loggingService.grpcInvoking(headers)
+ super.onMessage(message)
+ }
+ }
+ super.start(listener, headers)
+ }
+ }
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt
new file mode 100644
index 000000000..e21d5d3ce
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor
+
+import io.grpc.*
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput
+import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.slf4j.MDC
+
+class GrpcServerLoggingInterceptor : ServerInterceptor {
+ val log = logger(GrpcServerLoggingInterceptor::class)
+ val loggingService = GrpcLoggerService()
+
+ override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>,
+ requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>)
+ : ServerCall.Listener<ReqT> {
+
+ val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
+ override fun sendHeaders(responseHeaders: Metadata) {
+ loggingService.grpResponding(requestHeaders, responseHeaders)
+ super.sendHeaders(responseHeaders)
+ }
+ }
+
+ return object
+ : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
+ next.startCall(forwardingServerCall, requestHeaders)) {
+
+ override fun onMessage(message: ReqT) {
+ /** Get the requestId, SubRequestId and Originator Id and set in MDS context
+ * If you are using other GRPC services, Implement own Logging Interceptors to get tracing.
+ * */
+ when (message) {
+ is ExecutionServiceInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintUploadInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintDownloadInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ is BluePrintRemoveInput -> {
+ val commonHeader = message.commonHeader
+ ?: throw BluePrintProcessorException("missing common header in request")
+ loggingService.grpcRequesting(call, commonHeader, next)
+ }
+ else -> {
+ loggingService.grpcRequesting(call, requestHeaders, next)
+ }
+ }
+ super.onMessage(message)
+ }
+
+ override fun onComplete() {
+ MDC.clear()
+ super.onComplete()
+ }
+
+ override fun onCancel() {
+ MDC.clear()
+ super.onCancel()
+ }
+
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
index a1d2188ab..f4933a3ad 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,17 +19,56 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.*
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.springframework.stereotype.Service
@Service(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY)
open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
+ /** GRPC Server Lib Property Service */
+ fun grpcServerProperties(jsonNode: JsonNode): GrpcServerProperties {
+ return when (val type = jsonNode.get("type").textValue()) {
+ GRPCLibConstants.TYPE_TOKEN_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TokenAuthGrpcServerProperties::class.java)!!
+ }
+ GRPCLibConstants.TYPE_TLS_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TLSAuthGrpcServerProperties::class.java)!!
+ }
+ else -> {
+ throw BluePrintProcessorException("Grpc type($type) not supported")
+ }
+ }
+ }
+
+ fun grpcServerProperties(prefix: String): GrpcServerProperties {
+ val type = bluePrintProperties.propertyBeanType(
+ "$prefix.type", String::class.java)
+ return when (type) {
+ GRPCLibConstants.TYPE_TOKEN_AUTH -> {
+ tokenAuthGrpcServerProperties(prefix)
+ }
+ GRPCLibConstants.TYPE_TLS_AUTH -> {
+ tlsAuthGrpcServerProperties(prefix)
+ }
+ else -> {
+ throw BluePrintProcessorException("Grpc type($type) not supported")
+ }
+ }
+ }
+
+ private fun tokenAuthGrpcServerProperties(prefix: String): TokenAuthGrpcServerProperties {
+ return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcServerProperties::class.java)
+ }
+
+ private fun tlsAuthGrpcServerProperties(prefix: String): TLSAuthGrpcServerProperties {
+ return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcServerProperties::class.java)
+ }
+
+ /** GRPC Client Lib Property Service */
+
fun blueprintGrpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService {
val restClientProperties = grpcClientProperties(jsonNode)
return blueprintGrpcClientService(restClientProperties)
@@ -42,11 +82,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
fun grpcClientProperties(jsonNode: JsonNode): GrpcClientProperties {
- val type = jsonNode.get("type").textValue()
+ val type = jsonNode.get("type").returnNullIfMissing()?.textValue()
+ ?: BluePrintProcessorException("missing type property")
return when (type) {
GRPCLibConstants.TYPE_TOKEN_AUTH -> {
JacksonUtils.readValue(jsonNode, TokenAuthGrpcClientProperties::class.java)!!
}
+ GRPCLibConstants.TYPE_TLS_AUTH -> {
+ JacksonUtils.readValue(jsonNode, TLSAuthGrpcClientProperties::class.java)!!
+ }
GRPCLibConstants.TYPE_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, BasicAuthGrpcClientProperties::class.java)!!
}
@@ -63,6 +107,9 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
GRPCLibConstants.TYPE_TOKEN_AUTH -> {
tokenAuthGrpcClientProperties(prefix)
}
+ GRPCLibConstants.TYPE_TLS_AUTH -> {
+ tlsAuthGrpcClientProperties(prefix)
+ }
GRPCLibConstants.TYPE_BASIC_AUTH -> {
basicAuthGrpcClientProperties(prefix)
}
@@ -75,12 +122,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
BluePrintGrpcClientService {
- when (grpcClientProperties) {
+ return when (grpcClientProperties) {
is TokenAuthGrpcClientProperties -> {
- return TokenAuthGrpcClientService(grpcClientProperties)
+ TokenAuthGrpcClientService(grpcClientProperties)
+ }
+ is TLSAuthGrpcClientProperties -> {
+ TLSAuthGrpcClientService(grpcClientProperties)
}
is BasicAuthGrpcClientProperties -> {
- return BasicAuthGrpcClientService(grpcClientProperties)
+ BasicAuthGrpcClientService(grpcClientProperties)
}
else -> {
throw BluePrintProcessorException("couldn't get grpc service for type(${grpcClientProperties.type})")
@@ -92,6 +142,10 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcClientProperties::class.java)
}
+ private fun tlsAuthGrpcClientProperties(prefix: String): TLSAuthGrpcClientProperties {
+ return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcClientProperties::class.java)
+ }
+
private fun basicAuthGrpcClientProperties(prefix: String): BasicAuthGrpcClientProperties {
return bluePrintProperties.propertyBeanType(prefix, BasicAuthGrpcClientProperties::class.java)
}
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt
index 016c05035..0d9291615 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +18,11 @@
package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
import io.grpc.ManagedChannel
+import io.grpc.netty.NettyServerBuilder
+
+interface BluePrintGrpcServerService {
+ fun serverBuilder(): NettyServerBuilder
+}
interface BluePrintGrpcClientService {
suspend fun channel(): ManagedChannel
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
new file mode 100644
index 000000000..6d2ba43cc
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.Grpc
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class GrpcLoggerService {
+
+ private val log = logger(GrpcLoggerService::class)
+
+ /** Used when server receives request */
+ fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+ headers: Metadata, next: ServerCallHandler<ReqT, RespT>) {
+ val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID()
+ val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID()
+ val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN"
+ grpcRequesting(requestID, invocationID, partnerName, call)
+ }
+
+ fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>,
+ headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) {
+ val requestID = headers.requestId.defaultToUUID()
+ val invocationID = headers.subRequestId.defaultToUUID()
+ val partnerName = headers.originatorId ?: "UNKNOWN"
+ grpcRequesting(requestID, invocationID, partnerName, call)
+ }
+
+ fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String,
+ call: ServerCall<ReqT, RespT>) {
+ val localhost = InetAddress.getLocalHost()
+
+ val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress
+ ?: throw BluePrintProcessorException("failed to get client address")
+ val serviceName = call.methodDescriptor.fullMethodName
+
+ MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+ MDC.put("ServiceName", serviceName)
+ log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+
+ /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id
+ * for outbound Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun grpcInvoking(requestHeader: Metadata) {
+ requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName)
+ }
+
+ /** Used when server returns response */
+ fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) {
+ try {
+ responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty())
+ responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty())
+ responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty())
+ } catch (e: Exception) {
+ log.warn("couldn't set grpc response headers", e)
+ }
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt
new file mode 100644
index 000000000..a70cbbce0
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.ManagedChannel
+import io.grpc.internal.DnsNameResolverProvider
+import io.grpc.internal.PickFirstLoadBalancerProvider
+import io.grpc.netty.GrpcSslContexts
+import io.grpc.netty.NettyChannelBuilder
+import io.netty.handler.ssl.SslContext
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+
+class TLSAuthGrpcClientService(private val tlsAuthGrpcClientProperties: TLSAuthGrpcClientProperties)
+ : BluePrintGrpcClientService {
+
+ override suspend fun channel(): ManagedChannel {
+ return NettyChannelBuilder
+ .forAddress(tlsAuthGrpcClientProperties.host, tlsAuthGrpcClientProperties.port)
+ .nameResolverFactory(DnsNameResolverProvider())
+ .loadBalancerFactory(PickFirstLoadBalancerProvider())
+ .intercept(GrpcClientLoggingInterceptor())
+ .sslContext(sslContext())
+ .build()
+ }
+
+ fun sslContext(): SslContext {
+ val builder = GrpcSslContexts.forClient()
+ if (tlsAuthGrpcClientProperties.trustCertCollection != null) {
+ builder.trustManager(normalizedFile(tlsAuthGrpcClientProperties.trustCertCollection!!))
+ }
+ if (tlsAuthGrpcClientProperties.clientCertChain != null
+ && tlsAuthGrpcClientProperties.clientPrivateKey != null) {
+ builder.keyManager(normalizedFile(tlsAuthGrpcClientProperties.clientCertChain!!),
+ normalizedFile(tlsAuthGrpcClientProperties.clientPrivateKey!!))
+ }
+ return builder.build()
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt
new file mode 100644
index 000000000..fc73d43f9
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.netty.GrpcSslContexts
+import io.grpc.netty.NettyServerBuilder
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+
+
+class TLSAuthGrpcServerService(private val tlsAuthGrpcServerProperties: TLSAuthGrpcServerProperties)
+ : BluePrintGrpcServerService {
+
+ override fun serverBuilder(): NettyServerBuilder {
+ return NettyServerBuilder
+ .forPort(tlsAuthGrpcServerProperties.port)
+ .sslContext(sslContext())
+ }
+
+ fun sslContext(): SslContext {
+ val sslClientContextBuilder = SslContextBuilder
+ .forServer(normalizedFile(tlsAuthGrpcServerProperties.certChain),
+ normalizedFile(tlsAuthGrpcServerProperties.privateKey))
+
+ tlsAuthGrpcServerProperties.trustCertCollection?.let { trustCertFile ->
+ sslClientContextBuilder.trustManager(normalizedFile(trustCertFile))
+ sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE)
+ }
+ return GrpcSslContexts.configure(sslClientContextBuilder).build()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
index dbff84211..601dc0e33 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt
@@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider
import io.grpc.internal.PickFirstLoadBalancerProvider
import io.grpc.netty.NettyChannelBuilder
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor
class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties)
: BluePrintGrpcClientService {
@@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke
.forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port)
.nameResolverFactory(DnsNameResolverProvider())
.loadBalancerFactory(PickFirstLoadBalancerProvider())
+ .intercept(GrpcClientLoggingInterceptor())
.intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build()
return managedChannel
}
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt
index 8df218fe9..b7ddc1569 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,9 +23,8 @@ import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BluePrintGrpcLibConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.*
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@@ -42,11 +42,25 @@ import kotlin.test.assertTrue
"blueprintsprocessor.grpcclient.sample.port=50505",
"blueprintsprocessor.grpcclient.sample.username=sampleuser",
"blueprintsprocessor.grpcclient.sample.password=sampleuser",
+
"blueprintsprocessor.grpcclient.token.type=token-auth",
"blueprintsprocessor.grpcclient.token.host=127.0.0.1",
"blueprintsprocessor.grpcclient.token.port=50505",
"blueprintsprocessor.grpcclient.token.username=sampleuser",
- "blueprintsprocessor.grpcclient.token.password=sampleuser"
+ "blueprintsprocessor.grpcclient.token.password=sampleuser",
+
+ "blueprintsprocessor.grpcserver.tls-sample.type=tls-auth",
+ "blueprintsprocessor.grpcserver.tls-sample.port=50505",
+ "blueprintsprocessor.grpcserver.tls-sample.certChain=server1.pem",
+ "blueprintsprocessor.grpcserver.tls-sample.privateKey=server1.key",
+ "blueprintsprocessor.grpcserver.tls-sample.trustCertCollection=ca.pem",
+
+ "blueprintsprocessor.grpcclient.tls-sample.type=tls-auth",
+ "blueprintsprocessor.grpcclient.tls-sample.host=127.0.0.1",
+ "blueprintsprocessor.grpcclient.tls-sample.port=50505",
+ "blueprintsprocessor.grpcclient.tls-sample.trustCertCollection=ca.pem",
+ "blueprintsprocessor.grpcclient.tls-sample.clientCertChain=client.pem",
+ "blueprintsprocessor.grpcclient.tls-sample.clientPrivateKey=client.key"
])
class BluePrintGrpcLibPropertyServiceTest {
@@ -129,4 +143,52 @@ class BluePrintGrpcLibPropertyServiceTest {
.blueprintGrpcClientService(actualObj)
assertTrue(svc is BasicAuthGrpcClientService)
}
+
+ @Test
+ fun testGrpcClientTLSProperties() {
+ val properties = bluePrintGrpcLibPropertyService
+ .grpcClientProperties("blueprintsprocessor.grpcclient.tls-sample") as TLSAuthGrpcClientProperties
+ assertNotNull(properties, "failed to create property bean")
+ assertNotNull(properties.host, "failed to get host property in property bean")
+ assertNotNull(properties.port, "failed to get host property in property bean")
+ assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean")
+ assertNotNull(properties.clientCertChain, "failed to get clientCertChain property in property bean")
+ assertNotNull(properties.clientPrivateKey, "failed to get clientPrivateKey property in property bean")
+
+ val configDsl = """{
+ "type" : "tls-auth",
+ "host" : "localhost",
+ "port" : "50505",
+ "trustCertCollection" : "server1.pem",
+ "clientCertChain" : "server1.key",
+ "clientPrivateKey" : "ca.pem"
+ }
+ """.trimIndent()
+ val jsonProperties = bluePrintGrpcLibPropertyService
+ .grpcClientProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcClientProperties
+ assertNotNull(jsonProperties, "failed to create property bean from json")
+ }
+
+ @Test
+ fun testGrpcServerTLSProperties() {
+ val properties = bluePrintGrpcLibPropertyService
+ .grpcServerProperties("blueprintsprocessor.grpcserver.tls-sample") as TLSAuthGrpcServerProperties
+ assertNotNull(properties, "failed to create property bean")
+ assertNotNull(properties.port, "failed to get host property in property bean")
+ assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean")
+ assertNotNull(properties.certChain, "failed to get certChain property in property bean")
+ assertNotNull(properties.privateKey, "failed to get privateKey property in property bean")
+
+ val configDsl = """{
+ "type" : "tls-auth",
+ "port" : "50505",
+ "certChain" : "server1.pem",
+ "privateKey" : "server1.key",
+ "trustCertCollection" : "ca.pem"
+ }
+ """.trimIndent()
+ val jsonProperties = bluePrintGrpcLibPropertyService
+ .grpcServerProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcServerProperties
+ assertNotNull(jsonProperties, "failed to create property bean from json")
+ }
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt
new file mode 100644
index 000000000..8154d3747
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
+import com.google.protobuf.util.JsonFormat
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import java.util.*
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+class BluePrintGrpcServerTest {
+
+ private val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply {
+ port = 50052
+ type = GRPCLibConstants.TYPE_TLS_AUTH
+ certChain = "src/test/resources/tls-manual/py-executor-chain.pem"
+ privateKey = "src/test/resources/tls-manual/py-executor-key.pem"
+ }
+
+ private val tlsAuthGrpcClientProperties = TLSAuthGrpcClientProperties().apply {
+ host = "localhost"
+ port = 50052
+ type = GRPCLibConstants.TYPE_TLS_AUTH
+ trustCertCollection = "src/test/resources/tls-manual/py-executor-chain.pem"
+ }
+
+ @Test
+ fun testGrpcTLSContext() {
+ val tlsAuthGrpcServerService = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties)
+ val sslContext = tlsAuthGrpcServerService.sslContext()
+ assertNotNull(sslContext, "failed to create grpc server ssl context")
+
+ val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties)
+ val clientSslContext = tlsAuthGrpcClientService.sslContext()
+ assertNotNull(clientSslContext, "failed to create grpc client ssl context")
+ }
+
+ /** TLS Client Integration testing, GRPC TLS Junit testing is not supported. */
+ //@Test
+ fun testGrpcTLSServerIntegration() {
+ runBlocking {
+ val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties)
+ val grpcChannel = tlsAuthGrpcClientService.channel()
+ /** Get Send and Receive Channel for bidirectional process method*/
+ val (reqChannel, resChannel) = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(),
+ grpcChannel)
+ launch {
+ resChannel.consumeEach {
+ log.info("Received Response")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ resChannel.cancel()
+ }
+ }
+ }
+ val request = getRequest("12345")
+ reqChannel.send(request)
+ }
+ }
+
+ private fun getRequest(requestId: String): ExecutionServiceInput {
+ val commonHeader = CommonHeader.newBuilder()
+ .setTimestamp("2012-04-23T18:25:43.511Z")
+ .setOriginatorId("System")
+ .setRequestId(requestId)
+ .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
+ val actionIdentifier = ActionIdentifiers.newBuilder()
+ .setActionName("SampleScript")
+ .setBlueprintName("sample-cba")
+ .setBlueprintVersion("1.0.0")
+ .setMode(ACTION_MODE_SYNC)
+ .build()
+ val jsonContent = """{ "key1" : "value1" }"""
+ val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
+ JsonFormat.parser().merge(jsonContent, payloadBuilder)
+
+ return ExecutionServiceInput.newBuilder()
+ .setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifier)
+ .setPayload(payloadBuilder.build())
+ .build()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt
new file mode 100644
index 000000000..d5bc70c48
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt
@@ -0,0 +1,90 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service
+
+import io.grpc.stub.StreamObserver
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+
+
+val log = logger(MockTLSBluePrintProcessingServer::class)
+
+/** For Integration testing stat this server, Set the working path to run this method */
+fun main() {
+ try {
+ val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply {
+ port = 50052
+ type = GRPCLibConstants.TYPE_TLS_AUTH
+ certChain = "src/test/resources/tls-manual/py-executor-chain.pem"
+ privateKey = "src/test/resources/tls-manual/py-executor-key.pem"
+ }
+ val server = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties).serverBuilder()
+ .intercept(GrpcServerLoggingInterceptor())
+ .addService(MockTLSBluePrintProcessingServer())
+ .build()
+ server.start()
+ log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...")
+ server.awaitTermination()
+ } catch (e: Exception) {
+ log.error("Failed to start tls grpc integration server", e)
+ }
+
+}
+
+class MockTLSBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
+ override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
+
+ return object : StreamObserver<ExecutionServiceInput> {
+ override fun onNext(executionServiceInput: ExecutionServiceInput) {
+ log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
+ "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ responseObserver.onCompleted()
+ }
+
+ override fun onError(error: Throwable) {
+ log.debug("Fail to process message", error)
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription(error.message)
+ .asException())
+ }
+
+ override fun onCompleted() {
+ log.info("Completed")
+ }
+ }
+ }
+
+ private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput {
+ val status = Status.newBuilder().setCode(200)
+ .setEventType(EventType.EVENT_COMPONENT_EXECUTED)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README
new file mode 100644
index 000000000..c4e91a2fc
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README
@@ -0,0 +1,5 @@
+
+Generate Certifactes & Keys
+----------------------------
+
+openssl req -x509 -newkey rsa:4096 -keyout my-private-key.pem -out my-public-key-cert.pem -days 3650 -nodes -subj '/CN=localhost'
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem
new file mode 100644
index 000000000..30f09dfea
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem
@@ -0,0 +1,27 @@
+-----BEGIN CERTIFICATE-----
+MIIEpDCCAowCCQDyhR+GR2RUiTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
+b2NhbGhvc3QwHhcNMTkxMDIzMDAwMTA0WhcNMjkxMDIwMDAwMTA0WjAUMRIwEAYD
+VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCs
+c4d6qfbW+GSMp+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzM
+m7VpS9jhiXOPZ5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+Rcq
+HSfbNC2Tb+a8jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72
+hq7FI3UD8+zREg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eU
+aJ4oWRt3gG/vnJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4K
+av5MqRKyhCACV4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3
+WteI7rezo0mL0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc
+1TtiGaBJV05y3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLD
+t17cCKzpzcVF5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+L
+LVVsS07yJPIGMLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv
+6yjKEIurjkwMipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABMA0GCSqGSIb3DQEB
+CwUAA4ICAQB7gJzvaOIP3/S2jrObz67g0jiz1cfb4I9KQwpwb6JUWbYm1QjBcGm4
+IhNbdPMD6dpwBc/A4JctA5E+/fArvl14UtK1jkaaE/GCumL0VUSZeAM6CK/63brt
+LplqCunv8ePHmiwjJBnhu+ewe1+mDMVDMw0iot/q+pOM3vqNS1Fipja+xFK1JQZx
+JmkjW/Ug3NHk/SSTfO+VNmlI5bBBApMqKmd9picsyDZ7dTBtZvbqV5eQsPZvv14G
+oEvWnvvom+D5GojroSO+OMHNDR3bzK6p0Cu8AiTy9Ls6J2e4GXJz3Cg/kuF9tNlR
+3X62zDT+CUipuYyTvmjbSyNMGwU7BIZTKFPuTtjh7EwT2g6S8RV9PmT98CQW6kTT
+RJbL7nMIOF0WusysAT5wj1HJ0QKBQCXK+L6WTKTTovaEE7JSVrYe7wVF8Q9SyBIM
+4CPVZt+GMyQKJ9SRnVgTDEMb7sj9HPaoVeDc6LQTv8Q//wFeTdZIWXQhpVJCQCEG
+qkRk9r3isF60ISOXXIYhqE+hx3QXY9M2UyHDtKXPZ7X370vADi2ebBMF8MpIZYl5
+628dME9JhOhLhD5qPJeva2Nq4gLpK+rO6t7ML0Us4edoKyoScowXAh80q1GW3EO3
+IxTK123651C/S0kDqLqZ9rknEdpwSujrT2UW95jUlfo5OKDrPpdOBw==
+-----END CERTIFICATE-----
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem
new file mode 100644
index 000000000..830a3ae21
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem
@@ -0,0 +1,52 @@
+-----BEGIN PRIVATE KEY-----
+MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCsc4d6qfbW+GSM
+p+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzMm7VpS9jhiXOP
+Z5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+RcqHSfbNC2Tb+a8
+jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72hq7FI3UD8+zR
+Eg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eUaJ4oWRt3gG/v
+nJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4Kav5MqRKyhCAC
+V4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3WteI7rezo0mL
+0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc1TtiGaBJV05y
+3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLDt17cCKzpzcVF
+5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+LLVVsS07yJPIG
+MLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv6yjKEIurjkwM
+ipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABAoICADepPmRAMbTnDYU8t/jRHXBE
+PO29htL0V0vk4nl+pt5JuZJe6iYA89DZa+3LnG6gEmfUJjSrT4BUXiE+O9U7D7CZ
+8qvgPqUmx1fk6+2AHmuefd/XanNnqQduD/jxLlQbC/gC2xdsev1ok9/tyNmKRmcs
+u81QUkzmpJUCVWiUNkELozswaBBJQj4I0iM1B60b6dlWVVi5/g3dkGVW38jIdaxX
+apoansKaaVoA+s63vd7CPRoFsleOoAB3FqvPREIO97CmJ848HJpwsTB0qDcnkbDV
+xgbDFhxrIozko09ptOvEUILXag45EDmvG8WEivmjVml0aUoTFD7cWHyJBQCpR4fU
+5W9mYd4Rrzbmpb+LGYdNyrp3wo3C7dJ7/ffBMQxmXTdMZkcxorxj4BRG3oACRQ1u
+Ff1iUruZzIIDtEkrC9hc5QpLlDf9b1obm8L9sxf1QmTt59o5oFG40GPwPP19GXwE
+l2faHwho2jYLM9rhuSsK/5sSmUshPNQYmfMnbWzTtghMPE/g0Cfpt8qbspq+G1bk
+z3M97JlFMF83ccRotDElX9E/ttjU7Lehoz+1sOyHiVW1E4oqKer4t+nI2bp6VYZm
+W94qptW7kb4o0DsvPCaoTPBxLJ1ag2WBlqoFkVI0YaxZiZ8OTR55Ovi4z5xWBO1q
+NkCKgdAUQvQVzVtASVGBAoIBAQDSw2nvPFN4gGZ6OI+8j2gWtPcsrhSHS9ykxBeB
+mB/HExYIe8k3EvClf2rnfwzuKgKyVMp7Ev7nH2jS/PGZq37QyXrw0NBGRnvJY0Ez
+YB1KTgf9xaHMGMut5efNvv/cPwYriqosgJ0pdt0vvUAIQ6EBv+iDXXqJ1lQUSRYk
+wKjFABi6TeJY4t9vC474KoXTDaHlwn9+TwnuRBk85wrZzlhK90J0iVa9/Eqeddsc
+Z3CuTlc+NmcP3qvniYODq8nyVc0pKw+28AVYYEd3aJfgm+dpcB21L0oz7CaxH/Rz
+FNONuQRaOzJrcuJsde/KG2X+MHs6hVMXXXWciPrJ2l+Cq7dnAoIBAQDRdwZDcgem
+tJHLihCRzUl9PKip4ZA5757ZyTy6WMLR3wMS2cNTK8+bTrUa0SSC4WSI28pybFA7
+QdSR08c5Nd7jXcIrtqspgZKhb0E60i8VQHhh6ba/kyQjsEz9c/G1WquPK13j2vZ0
+79bomDwFJPsFzABU+sC0/F42ZVQzy9qXkjngjtmaGfrCc7X+pV28nEGtyxHci3L4
+XXfE2dOb+GBVZPLBVXwcthdRYsFuU9GMy2GH0zVtWPOcGRnlpx53Tqg7NIeR0Nm1
+K35EaK8PH92PsAr0Xza7vQHY4cPRz+RhDzjyGQtnhKf96U6gzzt4ZVbQ/UuzDBcL
+PQ2DvUH+sqxBAoIBAEW5kiUsDu0xhTVv2tVll+jTK2ZjnLT5ut/jY2djHTgtrz9V
+PEb1BBmsIoC9PljYGxZGCMpYiW2KrZIHTiIpYwXNcdeTLSPik3cXV+2YIXiAghJJ
+PHKZzWAVS+97/YcubmsfL5cTYWrjQN9XO4TAYtaCV3iGB1DsT9p6J1I3Tl4F3yhb
+NcN0IrjI2R5uauFchC/PfYAaw81ISBUm1iciJYF/dUO6X7DwcvsjQD6QVe3ESwZw
+1v2gC7zIeHKp9WAvVHUHIubBVvNavqnZN01+JjtydNGI+IJe4Jn+WU9tF2OuTqtP
+JCn50sBQ7+gr0j0aatn8W3XCXHNRua3niWtgRYcCggEAT7OzfWxhPuyMYV9qiKAN
+a4ruPp3mjDUCQ6pP4jQuBT+PYtfbe8U63MSpIsgb1XVAFNdVBA70xGd7I/XqY3l9
+ExS08n8yR7vW+Hhl4KTjZ3m9lLwiXmj1omLOGM7KVRBoITUGJ9JEXyB3rM9oXyjA
+H2eNZMh5FSTGEHqj/IV/6paoUSrp37os8VqoEHoJ3d+zGhcf98RT/e9KyGt+GmX6
++eNMf4YwkJg07THfmkRoguNMfCtAtBfZsjbW5MyfShRy7PxC7ZgDju06wXr3yZB9
+dNQuhufH4s27azQUl7w8ETaCm5QuA7i1V2c0FPpljZ052JHZAQsDpbIYd11HREvm
+QQKCAQEA0W7xNYoFvnyikdG0t266LLv1EkWDFdgkelGx/eGe/JZ+au3uTM94EssC
+ni64XX2P8vK/te+c3jItYO4MRgnDJ7GW+bRnJFu2kBE0W4chx7vga0XApVCP+Ugg
+owv5yf9cOAHFulvPefsU0snYStD3gNq77XDg0CwoyUkpeq+GiupoQ8tquMSsrEwp
+ve5DtDip3cLHz2oVLB3mR4kKVwVwmOgO5RKq6N/H6Jxtf/Zk1I260dKr+Dv2MnDh
+dysO4zH5YEt2ML3oY4zY8lu+I5bHCBR1updSny0B31WrXAJyfZpMx+HOwETFKa3B
+v9AGKz0Jc2GOIRKHrCQ/WkZePetaYQ==
+-----END PRIVATE KEY-----
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
new file mode 100644
index 000000000..a817c0c74
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+import org.apache.kafka.common.header.Headers
+import org.apache.kafka.common.header.internals.RecordHeader
+import java.nio.charset.Charset
+
+
+fun <T : Headers> T?.toMap(): MutableMap<String, String> {
+ val map: MutableMap<String, String> = hashMapOf()
+ this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) }
+ return map
+}
+
+fun Headers.addHeader(key: String, value: String) {
+ this.add(RecordHeader(key, value.toByteArray()))
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 1cd8a2af7..184e85b70 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,8 +34,10 @@ open class MessageConsumerProperties
open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
lateinit var bootstrapServers: String
lateinit var groupId: String
- var clientId: String? = null
+ lateinit var clientId: String
var topic: String? = null
+ var autoCommit: Boolean = true
+ var autoOffsetReset: String = "latest"
var pollMillSec: Long = 1000
var pollRecords: Int = -1
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index 25f0bf44d..8bcc7580a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,16 +18,47 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+/** Consumer Function Interfaces */
+interface ConsumerFunction
interface BlueprintMessageConsumerService {
+ suspend fun subscribe(): Channel<String> {
+ return subscribe(null)
+ }
+
/** Subscribe to the Kafka channel with [additionalConfig] */
suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
/** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+ /** Consume and execute dynamic function [consumerFunction] */
+ suspend fun consume(consumerFunction: ConsumerFunction) {
+ consume(null, consumerFunction)
+ }
+
+ /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
+ /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
/** close the channel, consumer and other resources */
suspend fun shutDown()
-
+}
+/** Consumer dynamic implementation interface */
+interface KafkaConsumerRecordsFunction : ConsumerFunction {
+ suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>)
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index e33d41c09..7d8138639 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking
interface BlueprintMessageProducerService {
- fun sendMessage(message: Any): Boolean = runBlocking {
- sendMessageNB(message)
+ fun sendMessage(message: Any): Boolean {
+ return sendMessage(message = message, headers = null)
}
- fun sendMessage(topic: String, message: Any): Boolean = runBlocking {
- sendMessageNB(topic, message)
+ fun sendMessage(topic: String, message: Any): Boolean {
+ return sendMessage(topic, message, null)
}
- suspend fun sendMessageNB(message: Any): Boolean
+ fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+ sendMessageNB(message = message, headers = headers)
+ }
+
+ fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+ sendMessageNB(topic, message, headers)
+ }
+
+ suspend fun sendMessageNB(message: Any): Boolean {
+ return sendMessageNB(message = message, headers = null)
+ }
+
+ suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
+
+ suspend fun sendMessageNB(topic: String, message: Any): Boolean {
+ return sendMessageNB(topic, message, null)
+ }
- suspend fun sendMessageNB(topic: String, message: Any): Boolean
+ suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index b5d444a49..757846c81 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,33 +25,39 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
-class KafkaBasicAuthMessageConsumerService(
+open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
- private val channel = Channel<String>()
- private var kafkaConsumer: Consumer<String, String>? = null
val log = logger(KafkaBasicAuthMessageConsumerService::class)
+ val channel = Channel<String>()
+ var kafkaConsumer: Consumer<String, ByteArray>? = null
@Volatile
var keepGoing = true
- fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+ fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
val configProperties = hashMapOf<String, Any>()
configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
+ /**
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ */
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- if (messageConsumerProperties.clientId != null) {
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
- }
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
+
/** To handle Back pressure, Get only configured record for processing */
if (messageConsumerProperties.pollRecords > 0) {
configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
@@ -70,7 +77,7 @@ class KafkaBasicAuthMessageConsumerService(
}
- override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
@@ -80,22 +87,22 @@ class KafkaBasicAuthMessageConsumerService(
"topics(${messageConsumerProperties.bootstrapServers})"
}
- kafkaConsumer!!.subscribe(consumerTopic)
- log.info("Successfully consumed topic($consumerTopic)")
+ kafkaConsumer!!.subscribe(topics)
+ log.info("Successfully consumed topic($topics)")
- thread(start = true, name = "KafkaConsumer") {
+ thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
keepGoing = true
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
- log.info("Consumed Records : ${consumerRecords.count()}")
+ log.trace("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
consumerRecord.value()?.let {
launch {
if (!channel.isClosedForSend) {
- channel.send(it)
+ channel.send(String(it, Charset.defaultCharset()))
} else {
log.error("Channel is closed to receive message")
}
@@ -110,6 +117,46 @@ class KafkaBasicAuthMessageConsumerService(
return channel
}
+ override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ /** get to topic names */
+ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
+ check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
+ return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
+ }
+
+ override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction) {
+
+ val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
+
+ /** Create Kafka consumer */
+ kafkaConsumer = kafkaConsumer(additionalConfig)
+
+ checkNotNull(kafkaConsumer) {
+ "failed to create kafka consumer for " +
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
+ }
+
+ kafkaConsumer!!.subscribe(topics)
+ log.info("Successfully consumed topic($topics)")
+
+ thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
+ keepGoing = true
+ kafkaConsumer!!.use { kc ->
+ while (keepGoing) {
+ val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.trace("Consumed Records : ${consumerRecords.count()}")
+ runBlocking {
+ /** Execute dynamic consumer Block substitution */
+ kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
+ }
+ }
+ log.info("message listener shutting down.....")
+ }
+ }
+ }
+
override suspend fun shutDown() {
/** stop the polling loop */
keepGoing = false
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
index 1c93bb0fc..42adcd712 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,16 +18,18 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.producer.Callback
+import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.SendResult
-import org.springframework.util.concurrent.ListenableFutureCallback
+import java.nio.charset.Charset
class KafkaBasicAuthMessageProducerService(
private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
@@ -34,42 +37,46 @@ class KafkaBasicAuthMessageProducerService(
private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
- private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+ private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+
+ private val messageLoggerService = MessageLoggerService()
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessage(messageProducerProperties.topic!!, message)
+ return sendMessageNB(messageProducerProperties.topic!!, message)
}
- override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
- val serializedMessage = when (message) {
- is String -> {
- message
- }
- else -> {
- message.asJsonType().toString()
- }
- }
- val future = messageTemplate().send(topic, serializedMessage)
+ override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
+ checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+ return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+ }
- future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
- override fun onSuccess(result: SendResult<String, Any>) {
- log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
- }
+ override suspend fun sendMessageNB(topic: String, message: Any,
+ headers: MutableMap<String, String>?): Boolean {
+ val byteArrayMessage = when (message) {
+ is String -> message.toByteArray(Charset.defaultCharset())
+ else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+ }
- override fun onFailure(ex: Throwable) {
- log.error("Unable to send message", ex)
- }
- })
+ val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
+ headers?.let {
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
+ }
+ val callback = Callback { metadata, exception ->
+ log.info("message published offset(${metadata.offset()}, headers :$headers )")
+ }
+ messageTemplate().send(record, callback).get()
return true
}
- private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
- log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+ fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
+ log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = hashMapOf<String, Any>()
configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
if (messageProducerProperties.clientId != null) {
configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
}
@@ -79,14 +86,11 @@ class KafkaBasicAuthMessageProducerService(
if (additionalConfig != null) {
configProps.putAll(additionalConfig)
}
- return DefaultKafkaProducerFactory(configProps)
- }
- fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- if (kafkaTemplate == null) {
- kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+ if (kafkaProducer == null) {
+ kafkaProducer = KafkaProducer(configProps)
}
- return kafkaTemplate!!
+ return kafkaProducer!!
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
new file mode 100644
index 000000000..21bf1b76c
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Headers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import java.net.InetAddress
+import java.time.Instant
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+
+class MessageLoggerService {
+
+ private val log = logger(MessageLoggerService::class)
+
+ fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) {
+ messageConsuming(headers.requestId, headers.subRequestId,
+ headers.originatorId, consumerRecord)
+ }
+
+ fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID()
+ val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID()
+ val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN"
+ messageConsuming(requestID, invocationID, partnerName, consumerRecord)
+ }
+
+
+ fun messageConsuming(requestID: String, invocationID: String, partnerName: String,
+ consumerRecord: ConsumerRecord<*, *>) {
+ val headers = consumerRecord.headers().toMap()
+ val localhost = InetAddress.getLocalHost()
+ MDC.put("InvokeTimestamp", ZonedDateTime
+ .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC)
+ .format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty())
+ MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty())
+ MDC.put("ServiceName", consumerRecord.topic())
+ // Custom MDC for Message Consumers
+ MDC.put("Offset", consumerRecord.offset().toString())
+ MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty())
+ log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}")
+ }
+
+ /** Used before producing message request, Inbound Invocation ID is used as request Id
+ * for produced message Request, If invocation Id is missing then default Request Id will be generated.
+ */
+ fun messageProducing(requestHeader: Headers) {
+ val localhost = InetAddress.getLocalHost()
+ requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID())
+ requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString())
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName)
+ requestHeader.addHeader("ClientIPAddress", localhost.hostAddress)
+ }
+
+ fun messageConsumingExisting() {
+ MDC.clear()
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index f4e85a94b..bdceec7d3 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,5 +1,6 @@
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.MockConsumer
-import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
@@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest {
partitionsEndMap[partition] = records
topicsCollection.add(partition.topic())
}
- val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
mockKafkaConsumer.subscribe(topicsCollection)
mockKafkaConsumer.rebalance(partitions)
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
- "I am message $i")
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
mockKafkaConsumer.addRecord(record)
}
@@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+ @Test
+ fun testKafkaBasicAuthConsumerWithDynamicFunction() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+ val topic = "default-topic"
+ val partitions: MutableList<TopicPartition> = arrayListOf()
+ val topicsCollection: MutableList<String> = arrayListOf()
+ partitions.add(TopicPartition(topic, 1))
+ val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+ val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+ val records: Long = 10
+ partitions.forEach { partition ->
+ partitionsBeginningMap[partition] = 0L
+ partitionsEndMap[partition] = records
+ topicsCollection.add(partition.topic())
+ }
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
+ mockKafkaConsumer.subscribe(topicsCollection)
+ mockKafkaConsumer.rebalance(partitions)
+ mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+ mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+ for (i in 1..10) {
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
+ mockKafkaConsumer.addRecord(record)
+ }
+
+ every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+ /** Test Consumer Function implementation */
+ val consumerFunction = object : KafkaConsumerRecordsFunction {
+ override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+ val count = consumerRecords.count()
+ log.trace("Received Message count($count)")
+ }
+ }
+ spyBlueprintMessageConsumerService.consume(consumerFunction)
+ delay(10)
+ spyBlueprintMessageConsumerService.shutDown()
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
//@Test
fun testKafkaIntegration() {
@@ -131,7 +179,10 @@ open class BlueprintMessageConsumerServiceTest {
launch {
repeat(5) {
delay(100)
- blueprintMessageProducerService.sendMessage("this is my message($it)")
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
}
}
delay(5000)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 31bcc1517..f23624f7a 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -20,18 +20,18 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.RecordMetadata
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.SendResult
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
-import org.springframework.util.concurrent.SettableListenableFuture
+import java.util.concurrent.Future
import kotlin.test.Test
import kotlin.test.assertTrue
@@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
- val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
+ val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
- val future = SettableListenableFuture<SendResult<String, Any>>()
- //future.setException(BluePrintException("failed sending"))
+ val responseMock = mockk<Future<RecordMetadata>>()
+ every { responseMock.get() } returns mockk()
- every { mockKafkaTemplate.send(any(), any()) } returns future
+ every { mockKafkaTemplate.send(any(), any()) } returns responseMock
val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
new file mode 100644
index 000000000..82e40efd1
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.mockk
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.slf4j.MDC
+import kotlin.test.assertEquals
+
+class MessageLoggerServiceTest {
+
+
+ @Test
+ fun testMessagingHeaders() {
+ val messageLoggerService = MessageLoggerService()
+ val commonHeader = CommonHeader().apply {
+ requestId = "1234"
+ subRequestId = "1234-12"
+ originatorId = "cds-test"
+ }
+
+ val consumerRecord = mockk<ConsumerRecord<*, *>>()
+ every { consumerRecord.headers() } returns null
+ every { consumerRecord.key() } returns "1234"
+ every { consumerRecord.offset() } returns 12345
+ every { consumerRecord.topic() } returns "sample-topic"
+ every { consumerRecord.timestamp() } returns System.currentTimeMillis()
+ messageLoggerService.messageConsuming(commonHeader, consumerRecord)
+ assertEquals(commonHeader.requestId, MDC.get("RequestID"))
+ assertEquals(commonHeader.subRequestId, MDC.get("InvocationID"))
+
+ val mockHeaders = RecordHeaders()
+ messageLoggerService.messageProducing(mockHeaders)
+ val map = mockHeaders.toMap()
+ assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID])
+
+ messageLoggerService.messageConsumingExisting()
+
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
index 3868440c7..820041f74 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
@@ -1,5 +1,6 @@
<!--
~ Copyright © 2019 IBM.
+ ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -15,11 +16,18 @@
-->
<configuration>
+
+ <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+ <property name="defaultPattern"
+ value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+ <property name="testing"
+ value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+ <pattern>${localPattern}</pattern>
</encoder>
</appender>
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
deleted file mode 100644
index cdf6ce195..000000000
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.core
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.reactor.ReactorContext
-import kotlinx.coroutines.reactor.asCoroutineContext
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine
-import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
-import reactor.core.publisher.Mono
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-
-/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
-@UseExperimental(InternalCoroutinesApi::class)
-fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
- block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
-
- val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
- ?: sink.currentContext()).asCoroutineContext()
- /** Populate MDC context only if present in Reactor Context */
- val newContext = if (!reactorContext.context.isEmpty
- && reactorContext.context.hasKey(MDCContext)) {
- val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
- GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
- } else GlobalScope.newCoroutineContext(context + reactorContext)
-
- val coroutine = MonoMDCCoroutine(newContext, sink)
- sink.onDispose(coroutine)
- coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt
index 2f9ea4a25..d63f34ced 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt
@@ -39,7 +39,8 @@ data class RemoteScriptExecutionInput(var requestId: String,
data class RemoteScriptExecutionOutput(var requestId: String,
var response: List<String>,
var status: StatusType = StatusType.SUCCESS,
- var timestamp: Date = Date())
+ var timestamp: Date = Date(),
+ var payload: JsonNode)
data class PrepareRemoteEnvInput(var requestId: String,
var correlationId: String? = null,
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
index 4da7dcd0e..cec11ae3c 100644
--- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
+++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt
@@ -14,43 +14,47 @@
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+package org.onap.ccsdk.cds.blueprintsprocessor.rest.service
-import kotlinx.coroutines.AbstractCoroutine
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.handleCoroutineException
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactor.ReactorContext
+import kotlinx.coroutines.reactor.asCoroutineContext
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.slf4j.MDC
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.http.server.reactive.ServerHttpResponse
import reactor.core.Disposable
+import reactor.core.publisher.Mono
import reactor.core.publisher.MonoSink
+import java.net.InetAddress
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
-import java.util.*
import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
-class LoggingService {
- private val log = logger(LoggingService::class)
+class RestLoggerService {
+ private val log = logger(RestLoggerService::class)
- companion object {
- const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
- const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
- const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
- }
fun entering(request: ServerHttpRequest) {
+ val localhost = InetAddress.getLocalHost()
val headers = request.headers
- val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID))
- val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID))
- val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME))
+ val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID()
+ val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID()
+ val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty()
MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
MDC.put("RequestID", requestID)
MDC.put("InvocationID", invocationID)
MDC.put("PartnerName", partnerName)
- MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress))
- MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString))
+ MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty())
+ MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty())
if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
MDC.put("ServiceName", request.uri.path)
}
@@ -62,22 +66,35 @@ class LoggingService {
val resHeaders = response.headers
resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID")
resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID")
+ val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor"
+ resHeaders[ONAP_PARTNER_NAME] = partnerName
} catch (e: Exception) {
log.warn("couldn't set response headers", e)
} finally {
MDC.clear()
}
}
+}
- private fun defaultToEmpty(input: Any?): String {
- return input?.toString() ?: ""
- }
- private fun defaultToUUID(input: String?): String {
- return input ?: UUID.randomUUID().toString()
- }
-}
+/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
+@UseExperimental(InternalCoroutinesApi::class)
+fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
+
+ val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
+ ?: sink.currentContext()).asCoroutineContext()
+ /** Populate MDC context only if present in Reactor Context */
+ val newContext = if (!reactorContext.context.isEmpty
+ && reactorContext.context.hasKey(MDCContext)) {
+ val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
+ GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
+ } else GlobalScope.newCoroutineContext(context + reactorContext)
+ val coroutine = MonoMDCCoroutine(newContext, sink)
+ sink.onDispose(coroutine)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+}
@InternalCoroutinesApi
class MonoMDCCoroutine<in T>(