diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons')
32 files changed, 1298 insertions, 155 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml index 5945e29fd..15cabb260 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/pom.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- ~ Copyright © 2019 IBM. + ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -42,5 +43,9 @@ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>processor-core</artifactId> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + </dependency> </dependencies> </project> diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt new file mode 100644 index 000000000..55cf0941d --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcExtensions.kt @@ -0,0 +1,27 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc + +import io.grpc.Metadata + +fun Metadata.getStringKey(key: String): String? { + return this.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)) +} + +fun Metadata.putStringKeyValue(key: String, value: String) { + this.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt index 1bef3a0f2..0ec049a3c 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibConfiguration.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,10 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc +import com.fasterxml.jackson.databind.JsonNode +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration @@ -23,10 +28,25 @@ import org.springframework.context.annotation.Configuration @ComponentScan open class BluePrintGrpcLibConfiguration +/** + * Exposed Dependency Service by this GRPC Lib Module + */ +fun BluePrintDependencyService.grpcLibPropertyService(): BluePrintGrpcLibPropertyService = + instance(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY) + +fun BluePrintDependencyService.grpcClientService(selector: String): BluePrintGrpcClientService { + return grpcLibPropertyService().blueprintGrpcClientService(selector) +} + +fun BluePrintDependencyService.grpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService { + return grpcLibPropertyService().blueprintGrpcClientService(jsonNode) +} + class GRPCLibConstants { companion object { const val SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY = "blueprint-grpc-lib-property-service" const val TYPE_TOKEN_AUTH = "token-auth" const val TYPE_BASIC_AUTH = "basic-auth" + const val TYPE_TLS_AUTH = "tls-auth" } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt index 76e60bd0d..47d16fbc7 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/BluePrintGrpcLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +17,24 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc +/** GRPC Server Properties */ +open class GrpcServerProperties { + lateinit var type: String + var port: Int = -1 +} + +open class TokenAuthGrpcServerProperties : GrpcServerProperties() { + lateinit var token: String +} + +open class TLSAuthGrpcServerProperties : GrpcServerProperties() { + lateinit var certChain: String + lateinit var privateKey: String + /** Below Used only for Mutual TLS */ + var trustCertCollection: String? = null +} + +/** GRPC Client Properties */ open class GrpcClientProperties { lateinit var type: String lateinit var host: String @@ -26,6 +45,13 @@ open class TokenAuthGrpcClientProperties : GrpcClientProperties() { lateinit var token: String } +open class TLSAuthGrpcClientProperties : GrpcClientProperties() { + var trustCertCollection: String? = null + /** Below Used only for Mutual TLS */ + var clientCertChain: String? = null + var clientPrivateKey: String? = null +} + open class BasicAuthGrpcClientProperties : GrpcClientProperties() { lateinit var username: String lateinit var password: String diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt new file mode 100644 index 000000000..f3b14b59f --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcClientLoggingInterceptor.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.logger + + +class GrpcClientLoggingInterceptor : ClientInterceptor { + val log = logger(GrpcClientLoggingInterceptor::class) + + val loggingService = GrpcLoggerService() + + override fun <ReqT, RespT> interceptCall(method: MethodDescriptor<ReqT, RespT>, + callOptions: CallOptions, channel: Channel): ClientCall<ReqT, RespT> { + + return object : ForwardingClientCall + .SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) { + + override fun start(responseListener: Listener<RespT>, headers: Metadata) { + val listener = object : ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) { + override fun onMessage(message: RespT) { + loggingService.grpcInvoking(headers) + super.onMessage(message) + } + } + super.start(listener, headers) + } + } + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt new file mode 100644 index 000000000..e21d5d3ce --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/interceptor/GrpcServerLoggingInterceptor.kt @@ -0,0 +1,92 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor + +import io.grpc.* +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.GrpcLoggerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintDownloadInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintRemoveInput +import org.onap.ccsdk.cds.controllerblueprints.management.api.BluePrintUploadInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.slf4j.MDC + +class GrpcServerLoggingInterceptor : ServerInterceptor { + val log = logger(GrpcServerLoggingInterceptor::class) + val loggingService = GrpcLoggerService() + + override fun <ReqT : Any, RespT : Any> interceptCall(call: ServerCall<ReqT, RespT>, + requestHeaders: Metadata, next: ServerCallHandler<ReqT, RespT>) + : ServerCall.Listener<ReqT> { + + val forwardingServerCall = object : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { + override fun sendHeaders(responseHeaders: Metadata) { + loggingService.grpResponding(requestHeaders, responseHeaders) + super.sendHeaders(responseHeaders) + } + } + + return object + : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>( + next.startCall(forwardingServerCall, requestHeaders)) { + + override fun onMessage(message: ReqT) { + /** Get the requestId, SubRequestId and Originator Id and set in MDS context + * If you are using other GRPC services, Implement own Logging Interceptors to get tracing. + * */ + when (message) { + is ExecutionServiceInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintUploadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintDownloadInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + is BluePrintRemoveInput -> { + val commonHeader = message.commonHeader + ?: throw BluePrintProcessorException("missing common header in request") + loggingService.grpcRequesting(call, commonHeader, next) + } + else -> { + loggingService.grpcRequesting(call, requestHeaders, next) + } + } + super.onMessage(message) + } + + override fun onComplete() { + MDC.clear() + super.onComplete() + } + + override fun onCancel() { + MDC.clear() + super.onCancel() + } + + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt index a1d2188ab..f4933a3ad 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +19,56 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service import com.fasterxml.jackson.databind.JsonNode import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.* import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils import org.springframework.stereotype.Service @Service(GRPCLibConstants.SERVICE_BLUEPRINT_GRPC_LIB_PROPERTY) open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: BluePrintProperties) { + /** GRPC Server Lib Property Service */ + fun grpcServerProperties(jsonNode: JsonNode): GrpcServerProperties { + return when (val type = jsonNode.get("type").textValue()) { + GRPCLibConstants.TYPE_TOKEN_AUTH -> { + JacksonUtils.readValue(jsonNode, TokenAuthGrpcServerProperties::class.java)!! + } + GRPCLibConstants.TYPE_TLS_AUTH -> { + JacksonUtils.readValue(jsonNode, TLSAuthGrpcServerProperties::class.java)!! + } + else -> { + throw BluePrintProcessorException("Grpc type($type) not supported") + } + } + } + + fun grpcServerProperties(prefix: String): GrpcServerProperties { + val type = bluePrintProperties.propertyBeanType( + "$prefix.type", String::class.java) + return when (type) { + GRPCLibConstants.TYPE_TOKEN_AUTH -> { + tokenAuthGrpcServerProperties(prefix) + } + GRPCLibConstants.TYPE_TLS_AUTH -> { + tlsAuthGrpcServerProperties(prefix) + } + else -> { + throw BluePrintProcessorException("Grpc type($type) not supported") + } + } + } + + private fun tokenAuthGrpcServerProperties(prefix: String): TokenAuthGrpcServerProperties { + return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcServerProperties::class.java) + } + + private fun tlsAuthGrpcServerProperties(prefix: String): TLSAuthGrpcServerProperties { + return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcServerProperties::class.java) + } + + /** GRPC Client Lib Property Service */ + fun blueprintGrpcClientService(jsonNode: JsonNode): BluePrintGrpcClientService { val restClientProperties = grpcClientProperties(jsonNode) return blueprintGrpcClientService(restClientProperties) @@ -42,11 +82,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue fun grpcClientProperties(jsonNode: JsonNode): GrpcClientProperties { - val type = jsonNode.get("type").textValue() + val type = jsonNode.get("type").returnNullIfMissing()?.textValue() + ?: BluePrintProcessorException("missing type property") return when (type) { GRPCLibConstants.TYPE_TOKEN_AUTH -> { JacksonUtils.readValue(jsonNode, TokenAuthGrpcClientProperties::class.java)!! } + GRPCLibConstants.TYPE_TLS_AUTH -> { + JacksonUtils.readValue(jsonNode, TLSAuthGrpcClientProperties::class.java)!! + } GRPCLibConstants.TYPE_BASIC_AUTH -> { JacksonUtils.readValue(jsonNode, BasicAuthGrpcClientProperties::class.java)!! } @@ -63,6 +107,9 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue GRPCLibConstants.TYPE_TOKEN_AUTH -> { tokenAuthGrpcClientProperties(prefix) } + GRPCLibConstants.TYPE_TLS_AUTH -> { + tlsAuthGrpcClientProperties(prefix) + } GRPCLibConstants.TYPE_BASIC_AUTH -> { basicAuthGrpcClientProperties(prefix) } @@ -75,12 +122,15 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties): BluePrintGrpcClientService { - when (grpcClientProperties) { + return when (grpcClientProperties) { is TokenAuthGrpcClientProperties -> { - return TokenAuthGrpcClientService(grpcClientProperties) + TokenAuthGrpcClientService(grpcClientProperties) + } + is TLSAuthGrpcClientProperties -> { + TLSAuthGrpcClientService(grpcClientProperties) } is BasicAuthGrpcClientProperties -> { - return BasicAuthGrpcClientService(grpcClientProperties) + BasicAuthGrpcClientService(grpcClientProperties) } else -> { throw BluePrintProcessorException("couldn't get grpc service for type(${grpcClientProperties.type})") @@ -92,6 +142,10 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue return bluePrintProperties.propertyBeanType(prefix, TokenAuthGrpcClientProperties::class.java) } + private fun tlsAuthGrpcClientProperties(prefix: String): TLSAuthGrpcClientProperties { + return bluePrintProperties.propertyBeanType(prefix, TLSAuthGrpcClientProperties::class.java) + } + private fun basicAuthGrpcClientProperties(prefix: String): BasicAuthGrpcClientProperties { return bluePrintProperties.propertyBeanType(prefix, BasicAuthGrpcClientProperties::class.java) } diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt index 016c05035..0d9291615 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcClientService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +18,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service import io.grpc.ManagedChannel +import io.grpc.netty.NettyServerBuilder + +interface BluePrintGrpcServerService { + fun serverBuilder(): NettyServerBuilder +} interface BluePrintGrpcClientService { suspend fun channel(): ManagedChannel diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt new file mode 100644 index 000000000..6d2ba43cc --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/GrpcLoggerService.kt @@ -0,0 +1,101 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.Grpc +import io.grpc.Metadata +import io.grpc.ServerCall +import io.grpc.ServerCallHandler +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.getStringKey +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.putStringKeyValue +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.net.InetSocketAddress +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class GrpcLoggerService { + + private val log = logger(GrpcLoggerService::class) + + /** Used when server receives request */ + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: Metadata, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.getStringKey(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getStringKey(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getStringKey(ONAP_PARTNER_NAME) ?: "UNKNOWN" + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(call: ServerCall<ReqT, RespT>, + headers: CommonHeader, next: ServerCallHandler<ReqT, RespT>) { + val requestID = headers.requestId.defaultToUUID() + val invocationID = headers.subRequestId.defaultToUUID() + val partnerName = headers.originatorId ?: "UNKNOWN" + grpcRequesting(requestID, invocationID, partnerName, call) + } + + fun <ReqT : Any, RespT : Any> grpcRequesting(requestID: String, invocationID: String, partnerName: String, + call: ServerCall<ReqT, RespT>) { + val localhost = InetAddress.getLocalHost() + + val clientSocketAddress = call.attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) as? InetSocketAddress + ?: throw BluePrintProcessorException("failed to get client address") + val serviceName = call.methodDescriptor.fullMethodName + + MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", clientSocketAddress.address.defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", serviceName) + log.trace("MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + + /** Used before invoking any GRPC outbound request, Inbound Invocation ID is used as request Id + * for outbound Request, If invocation Id is missing then default Request Id will be generated. + */ + fun grpcInvoking(requestHeader: Metadata) { + requestHeader.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.putStringKeyValue(ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.putStringKeyValue(ONAP_PARTNER_NAME, partnerName) + } + + /** Used when server returns response */ + fun grpResponding(requestHeaders: Metadata, responseHeaders: Metadata) { + try { + responseHeaders.putStringKeyValue(ONAP_REQUEST_ID, MDC.get("RequestID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_INVOCATION_ID, MDC.get("InvocationID").defaultToEmpty()) + responseHeaders.putStringKeyValue(ONAP_PARTNER_NAME, MDC.get("PartnerName").defaultToEmpty()) + } catch (e: Exception) { + log.warn("couldn't set grpc response headers", e) + } + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt new file mode 100644 index 000000000..a70cbbce0 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcClientService.kt @@ -0,0 +1,54 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.ManagedChannel +import io.grpc.internal.DnsNameResolverProvider +import io.grpc.internal.PickFirstLoadBalancerProvider +import io.grpc.netty.GrpcSslContexts +import io.grpc.netty.NettyChannelBuilder +import io.netty.handler.ssl.SslContext +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + +class TLSAuthGrpcClientService(private val tlsAuthGrpcClientProperties: TLSAuthGrpcClientProperties) + : BluePrintGrpcClientService { + + override suspend fun channel(): ManagedChannel { + return NettyChannelBuilder + .forAddress(tlsAuthGrpcClientProperties.host, tlsAuthGrpcClientProperties.port) + .nameResolverFactory(DnsNameResolverProvider()) + .loadBalancerFactory(PickFirstLoadBalancerProvider()) + .intercept(GrpcClientLoggingInterceptor()) + .sslContext(sslContext()) + .build() + } + + fun sslContext(): SslContext { + val builder = GrpcSslContexts.forClient() + if (tlsAuthGrpcClientProperties.trustCertCollection != null) { + builder.trustManager(normalizedFile(tlsAuthGrpcClientProperties.trustCertCollection!!)) + } + if (tlsAuthGrpcClientProperties.clientCertChain != null + && tlsAuthGrpcClientProperties.clientPrivateKey != null) { + builder.keyManager(normalizedFile(tlsAuthGrpcClientProperties.clientCertChain!!), + normalizedFile(tlsAuthGrpcClientProperties.clientPrivateKey!!)) + } + return builder.build() + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt new file mode 100644 index 000000000..fc73d43f9 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TLSAuthGrpcServerService.kt @@ -0,0 +1,49 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.netty.GrpcSslContexts +import io.grpc.netty.NettyServerBuilder +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile + + +class TLSAuthGrpcServerService(private val tlsAuthGrpcServerProperties: TLSAuthGrpcServerProperties) + : BluePrintGrpcServerService { + + override fun serverBuilder(): NettyServerBuilder { + return NettyServerBuilder + .forPort(tlsAuthGrpcServerProperties.port) + .sslContext(sslContext()) + } + + fun sslContext(): SslContext { + val sslClientContextBuilder = SslContextBuilder + .forServer(normalizedFile(tlsAuthGrpcServerProperties.certChain), + normalizedFile(tlsAuthGrpcServerProperties.privateKey)) + + tlsAuthGrpcServerProperties.trustCertCollection?.let { trustCertFile -> + sslClientContextBuilder.trustManager(normalizedFile(trustCertFile)) + sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE) + } + return GrpcSslContexts.configure(sslClientContextBuilder).build() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt index dbff84211..601dc0e33 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/TokenAuthGrpcClientService.kt @@ -21,6 +21,7 @@ import io.grpc.internal.DnsNameResolverProvider import io.grpc.internal.PickFirstLoadBalancerProvider import io.grpc.netty.NettyChannelBuilder import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcClientLoggingInterceptor class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: TokenAuthGrpcClientProperties) : BluePrintGrpcClientService { @@ -30,6 +31,7 @@ class TokenAuthGrpcClientService(private val tokenAuthGrpcClientProperties: Toke .forAddress(tokenAuthGrpcClientProperties.host, tokenAuthGrpcClientProperties.port) .nameResolverFactory(DnsNameResolverProvider()) .loadBalancerFactory(PickFirstLoadBalancerProvider()) + .intercept(GrpcClientLoggingInterceptor()) .intercept(TokenAuthClientInterceptor(tokenAuthGrpcClientProperties)).usePlaintext().build() return managedChannel } diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt index 8df218fe9..b7ddc1569 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,9 +23,8 @@ import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BasicAuthGrpcClientProperties -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.BluePrintGrpcLibConfiguration -import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.* +import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource @@ -42,11 +42,25 @@ import kotlin.test.assertTrue "blueprintsprocessor.grpcclient.sample.port=50505", "blueprintsprocessor.grpcclient.sample.username=sampleuser", "blueprintsprocessor.grpcclient.sample.password=sampleuser", + "blueprintsprocessor.grpcclient.token.type=token-auth", "blueprintsprocessor.grpcclient.token.host=127.0.0.1", "blueprintsprocessor.grpcclient.token.port=50505", "blueprintsprocessor.grpcclient.token.username=sampleuser", - "blueprintsprocessor.grpcclient.token.password=sampleuser" + "blueprintsprocessor.grpcclient.token.password=sampleuser", + + "blueprintsprocessor.grpcserver.tls-sample.type=tls-auth", + "blueprintsprocessor.grpcserver.tls-sample.port=50505", + "blueprintsprocessor.grpcserver.tls-sample.certChain=server1.pem", + "blueprintsprocessor.grpcserver.tls-sample.privateKey=server1.key", + "blueprintsprocessor.grpcserver.tls-sample.trustCertCollection=ca.pem", + + "blueprintsprocessor.grpcclient.tls-sample.type=tls-auth", + "blueprintsprocessor.grpcclient.tls-sample.host=127.0.0.1", + "blueprintsprocessor.grpcclient.tls-sample.port=50505", + "blueprintsprocessor.grpcclient.tls-sample.trustCertCollection=ca.pem", + "blueprintsprocessor.grpcclient.tls-sample.clientCertChain=client.pem", + "blueprintsprocessor.grpcclient.tls-sample.clientPrivateKey=client.key" ]) class BluePrintGrpcLibPropertyServiceTest { @@ -129,4 +143,52 @@ class BluePrintGrpcLibPropertyServiceTest { .blueprintGrpcClientService(actualObj) assertTrue(svc is BasicAuthGrpcClientService) } + + @Test + fun testGrpcClientTLSProperties() { + val properties = bluePrintGrpcLibPropertyService + .grpcClientProperties("blueprintsprocessor.grpcclient.tls-sample") as TLSAuthGrpcClientProperties + assertNotNull(properties, "failed to create property bean") + assertNotNull(properties.host, "failed to get host property in property bean") + assertNotNull(properties.port, "failed to get host property in property bean") + assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean") + assertNotNull(properties.clientCertChain, "failed to get clientCertChain property in property bean") + assertNotNull(properties.clientPrivateKey, "failed to get clientPrivateKey property in property bean") + + val configDsl = """{ + "type" : "tls-auth", + "host" : "localhost", + "port" : "50505", + "trustCertCollection" : "server1.pem", + "clientCertChain" : "server1.key", + "clientPrivateKey" : "ca.pem" + } + """.trimIndent() + val jsonProperties = bluePrintGrpcLibPropertyService + .grpcClientProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcClientProperties + assertNotNull(jsonProperties, "failed to create property bean from json") + } + + @Test + fun testGrpcServerTLSProperties() { + val properties = bluePrintGrpcLibPropertyService + .grpcServerProperties("blueprintsprocessor.grpcserver.tls-sample") as TLSAuthGrpcServerProperties + assertNotNull(properties, "failed to create property bean") + assertNotNull(properties.port, "failed to get host property in property bean") + assertNotNull(properties.trustCertCollection, "failed to get trustCertCollection property in property bean") + assertNotNull(properties.certChain, "failed to get certChain property in property bean") + assertNotNull(properties.privateKey, "failed to get privateKey property in property bean") + + val configDsl = """{ + "type" : "tls-auth", + "port" : "50505", + "certChain" : "server1.pem", + "privateKey" : "server1.key", + "trustCertCollection" : "ca.pem" + } + """.trimIndent() + val jsonProperties = bluePrintGrpcLibPropertyService + .grpcServerProperties(configDsl.jsonAsJsonType()) as TLSAuthGrpcServerProperties + assertNotNull(jsonProperties, "failed to create property bean from json") + } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt new file mode 100644 index 000000000..8154d3747 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcServerTest.kt @@ -0,0 +1,109 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming +import com.google.protobuf.util.JsonFormat +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import java.util.* +import kotlin.test.Test +import kotlin.test.assertNotNull + +class BluePrintGrpcServerTest { + + private val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply { + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + certChain = "src/test/resources/tls-manual/py-executor-chain.pem" + privateKey = "src/test/resources/tls-manual/py-executor-key.pem" + } + + private val tlsAuthGrpcClientProperties = TLSAuthGrpcClientProperties().apply { + host = "localhost" + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + trustCertCollection = "src/test/resources/tls-manual/py-executor-chain.pem" + } + + @Test + fun testGrpcTLSContext() { + val tlsAuthGrpcServerService = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties) + val sslContext = tlsAuthGrpcServerService.sslContext() + assertNotNull(sslContext, "failed to create grpc server ssl context") + + val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties) + val clientSslContext = tlsAuthGrpcClientService.sslContext() + assertNotNull(clientSslContext, "failed to create grpc client ssl context") + } + + /** TLS Client Integration testing, GRPC TLS Junit testing is not supported. */ + //@Test + fun testGrpcTLSServerIntegration() { + runBlocking { + val tlsAuthGrpcClientService = TLSAuthGrpcClientService(tlsAuthGrpcClientProperties) + val grpcChannel = tlsAuthGrpcClientService.channel() + /** Get Send and Receive Channel for bidirectional process method*/ + val (reqChannel, resChannel) = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), + grpcChannel) + launch { + resChannel.consumeEach { + log.info("Received Response") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + resChannel.cancel() + } + } + } + val request = getRequest("12345") + reqChannel.send(request) + } + } + + private fun getRequest(requestId: String): ExecutionServiceInput { + val commonHeader = CommonHeader.newBuilder() + .setTimestamp("2012-04-23T18:25:43.511Z") + .setOriginatorId("System") + .setRequestId(requestId) + .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build() + val actionIdentifier = ActionIdentifiers.newBuilder() + .setActionName("SampleScript") + .setBlueprintName("sample-cba") + .setBlueprintVersion("1.0.0") + .setMode(ACTION_MODE_SYNC) + .build() + val jsonContent = """{ "key1" : "value1" }""" + val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder + JsonFormat.parser().merge(jsonContent, payloadBuilder) + + return ExecutionServiceInput.newBuilder() + .setCommonHeader(commonHeader) + .setActionIdentifiers(actionIdentifier) + .setPayload(payloadBuilder.build()) + .build() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt new file mode 100644 index 000000000..d5bc70c48 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/MockTLSBluePrintProcessingServer.kt @@ -0,0 +1,90 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.grpc.service + +import io.grpc.stub.StreamObserver +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TLSAuthGrpcServerProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.interceptor.GrpcServerLoggingInterceptor +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.common.api.Status +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput + + +val log = logger(MockTLSBluePrintProcessingServer::class) + +/** For Integration testing stat this server, Set the working path to run this method */ +fun main() { + try { + val tlsAuthGrpcServerProperties = TLSAuthGrpcServerProperties().apply { + port = 50052 + type = GRPCLibConstants.TYPE_TLS_AUTH + certChain = "src/test/resources/tls-manual/py-executor-chain.pem" + privateKey = "src/test/resources/tls-manual/py-executor-key.pem" + } + val server = TLSAuthGrpcServerService(tlsAuthGrpcServerProperties).serverBuilder() + .intercept(GrpcServerLoggingInterceptor()) + .addService(MockTLSBluePrintProcessingServer()) + .build() + server.start() + log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...") + server.awaitTermination() + } catch (e: Exception) { + log.error("Failed to start tls grpc integration server", e) + } + +} + +class MockTLSBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { + override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { + + return object : StreamObserver<ExecutionServiceInput> { + override fun onNext(executionServiceInput: ExecutionServiceInput) { + log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " + + "subRequestId(${executionServiceInput.commonHeader.subRequestId})") + responseObserver.onNext(buildResponse(executionServiceInput)) + responseObserver.onCompleted() + } + + override fun onError(error: Throwable) { + log.debug("Fail to process message", error) + responseObserver.onError(io.grpc.Status.INTERNAL + .withDescription(error.message) + .asException()) + } + + override fun onCompleted() { + log.info("Completed") + } + } + } + + private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput { + val status = Status.newBuilder().setCode(200) + .setEventType(EventType.EVENT_COMPONENT_EXECUTED) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README new file mode 100644 index 000000000..c4e91a2fc --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/README @@ -0,0 +1,5 @@ + +Generate Certifactes & Keys +---------------------------- + +openssl req -x509 -newkey rsa:4096 -keyout my-private-key.pem -out my-public-key-cert.pem -days 3650 -nodes -subj '/CN=localhost' diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem new file mode 100644 index 000000000..30f09dfea --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-chain.pem @@ -0,0 +1,27 @@ +-----BEGIN CERTIFICATE----- +MIIEpDCCAowCCQDyhR+GR2RUiTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls +b2NhbGhvc3QwHhcNMTkxMDIzMDAwMTA0WhcNMjkxMDIwMDAwMTA0WjAUMRIwEAYD +VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCs +c4d6qfbW+GSMp+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzM +m7VpS9jhiXOPZ5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+Rcq +HSfbNC2Tb+a8jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72 +hq7FI3UD8+zREg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eU +aJ4oWRt3gG/vnJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4K +av5MqRKyhCACV4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3 +WteI7rezo0mL0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc +1TtiGaBJV05y3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLD +t17cCKzpzcVF5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+L +LVVsS07yJPIGMLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv +6yjKEIurjkwMipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABMA0GCSqGSIb3DQEB +CwUAA4ICAQB7gJzvaOIP3/S2jrObz67g0jiz1cfb4I9KQwpwb6JUWbYm1QjBcGm4 +IhNbdPMD6dpwBc/A4JctA5E+/fArvl14UtK1jkaaE/GCumL0VUSZeAM6CK/63brt +LplqCunv8ePHmiwjJBnhu+ewe1+mDMVDMw0iot/q+pOM3vqNS1Fipja+xFK1JQZx +JmkjW/Ug3NHk/SSTfO+VNmlI5bBBApMqKmd9picsyDZ7dTBtZvbqV5eQsPZvv14G +oEvWnvvom+D5GojroSO+OMHNDR3bzK6p0Cu8AiTy9Ls6J2e4GXJz3Cg/kuF9tNlR +3X62zDT+CUipuYyTvmjbSyNMGwU7BIZTKFPuTtjh7EwT2g6S8RV9PmT98CQW6kTT +RJbL7nMIOF0WusysAT5wj1HJ0QKBQCXK+L6WTKTTovaEE7JSVrYe7wVF8Q9SyBIM +4CPVZt+GMyQKJ9SRnVgTDEMb7sj9HPaoVeDc6LQTv8Q//wFeTdZIWXQhpVJCQCEG +qkRk9r3isF60ISOXXIYhqE+hx3QXY9M2UyHDtKXPZ7X370vADi2ebBMF8MpIZYl5 +628dME9JhOhLhD5qPJeva2Nq4gLpK+rO6t7ML0Us4edoKyoScowXAh80q1GW3EO3 +IxTK123651C/S0kDqLqZ9rknEdpwSujrT2UW95jUlfo5OKDrPpdOBw== +-----END CERTIFICATE----- diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem new file mode 100644 index 000000000..830a3ae21 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/test/resources/tls-manual/py-executor-key.pem @@ -0,0 +1,52 @@ +-----BEGIN PRIVATE KEY----- +MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCsc4d6qfbW+GSM +p+XURoLXtSAbbehoBXL2beSzqQNW6e+Q9IVtSPZst8VRjUXelFzMm7VpS9jhiXOP +Z5KKUOD0GVuNQc54VpwtHt7t9L5wS9OvdnLijnMIkc0iUvC6+RcqHSfbNC2Tb+a8 +jLwojmtRCeY/MyCnmqYpD+U3b6Eue89VpMOIfmDuTqSRRBYNVO72hq7FI3UD8+zR +Eg7htfzjJjG14Ec5iVMDxJA1FlwtXFnZxDHgbLjEVjTTR/9Wm1eUaJ4oWRt3gG/v +nJNa+GwN4w/My+j/5/n/YpNh6GeQrHxBl/SL/SAFBshlwozr4K4Kav5MqRKyhCAC +V4SsdhKJUEDtvrtukJvh/ZDW8jdNbFJAljm8UucZGbJrZl6G7XB3WteI7rezo0mL +0NMBZIT3nQSMEpefKUFZFiE5lYvIk3UuChqIM0xdgV4INwLRHZdc1TtiGaBJV05y +3Klo5gaUgNGbHP26zfub5TydiMrOA5W2mUvMkG2oit9aqnbaZBLDt17cCKzpzcVF +5uNUng3j6sQvpTt3S4L28TvKUMAfpecQqvxMoxG0/9HZuv2z+U+LLVVsS07yJPIG +MLcq1LMM++8LwD1MupcoShjNOq/lUOL6hIMfLOIfxt8Kv8WykVzv6yjKEIurjkwM +ipq4kvr9J7FFi54kGr7uvXWQRHDFJwIDAQABAoICADepPmRAMbTnDYU8t/jRHXBE +PO29htL0V0vk4nl+pt5JuZJe6iYA89DZa+3LnG6gEmfUJjSrT4BUXiE+O9U7D7CZ +8qvgPqUmx1fk6+2AHmuefd/XanNnqQduD/jxLlQbC/gC2xdsev1ok9/tyNmKRmcs +u81QUkzmpJUCVWiUNkELozswaBBJQj4I0iM1B60b6dlWVVi5/g3dkGVW38jIdaxX +apoansKaaVoA+s63vd7CPRoFsleOoAB3FqvPREIO97CmJ848HJpwsTB0qDcnkbDV +xgbDFhxrIozko09ptOvEUILXag45EDmvG8WEivmjVml0aUoTFD7cWHyJBQCpR4fU +5W9mYd4Rrzbmpb+LGYdNyrp3wo3C7dJ7/ffBMQxmXTdMZkcxorxj4BRG3oACRQ1u +Ff1iUruZzIIDtEkrC9hc5QpLlDf9b1obm8L9sxf1QmTt59o5oFG40GPwPP19GXwE +l2faHwho2jYLM9rhuSsK/5sSmUshPNQYmfMnbWzTtghMPE/g0Cfpt8qbspq+G1bk +z3M97JlFMF83ccRotDElX9E/ttjU7Lehoz+1sOyHiVW1E4oqKer4t+nI2bp6VYZm +W94qptW7kb4o0DsvPCaoTPBxLJ1ag2WBlqoFkVI0YaxZiZ8OTR55Ovi4z5xWBO1q +NkCKgdAUQvQVzVtASVGBAoIBAQDSw2nvPFN4gGZ6OI+8j2gWtPcsrhSHS9ykxBeB +mB/HExYIe8k3EvClf2rnfwzuKgKyVMp7Ev7nH2jS/PGZq37QyXrw0NBGRnvJY0Ez +YB1KTgf9xaHMGMut5efNvv/cPwYriqosgJ0pdt0vvUAIQ6EBv+iDXXqJ1lQUSRYk +wKjFABi6TeJY4t9vC474KoXTDaHlwn9+TwnuRBk85wrZzlhK90J0iVa9/Eqeddsc +Z3CuTlc+NmcP3qvniYODq8nyVc0pKw+28AVYYEd3aJfgm+dpcB21L0oz7CaxH/Rz +FNONuQRaOzJrcuJsde/KG2X+MHs6hVMXXXWciPrJ2l+Cq7dnAoIBAQDRdwZDcgem +tJHLihCRzUl9PKip4ZA5757ZyTy6WMLR3wMS2cNTK8+bTrUa0SSC4WSI28pybFA7 +QdSR08c5Nd7jXcIrtqspgZKhb0E60i8VQHhh6ba/kyQjsEz9c/G1WquPK13j2vZ0 +79bomDwFJPsFzABU+sC0/F42ZVQzy9qXkjngjtmaGfrCc7X+pV28nEGtyxHci3L4 +XXfE2dOb+GBVZPLBVXwcthdRYsFuU9GMy2GH0zVtWPOcGRnlpx53Tqg7NIeR0Nm1 +K35EaK8PH92PsAr0Xza7vQHY4cPRz+RhDzjyGQtnhKf96U6gzzt4ZVbQ/UuzDBcL +PQ2DvUH+sqxBAoIBAEW5kiUsDu0xhTVv2tVll+jTK2ZjnLT5ut/jY2djHTgtrz9V +PEb1BBmsIoC9PljYGxZGCMpYiW2KrZIHTiIpYwXNcdeTLSPik3cXV+2YIXiAghJJ +PHKZzWAVS+97/YcubmsfL5cTYWrjQN9XO4TAYtaCV3iGB1DsT9p6J1I3Tl4F3yhb +NcN0IrjI2R5uauFchC/PfYAaw81ISBUm1iciJYF/dUO6X7DwcvsjQD6QVe3ESwZw +1v2gC7zIeHKp9WAvVHUHIubBVvNavqnZN01+JjtydNGI+IJe4Jn+WU9tF2OuTqtP +JCn50sBQ7+gr0j0aatn8W3XCXHNRua3niWtgRYcCggEAT7OzfWxhPuyMYV9qiKAN +a4ruPp3mjDUCQ6pP4jQuBT+PYtfbe8U63MSpIsgb1XVAFNdVBA70xGd7I/XqY3l9 +ExS08n8yR7vW+Hhl4KTjZ3m9lLwiXmj1omLOGM7KVRBoITUGJ9JEXyB3rM9oXyjA +H2eNZMh5FSTGEHqj/IV/6paoUSrp37os8VqoEHoJ3d+zGhcf98RT/e9KyGt+GmX6 ++eNMf4YwkJg07THfmkRoguNMfCtAtBfZsjbW5MyfShRy7PxC7ZgDju06wXr3yZB9 +dNQuhufH4s27azQUl7w8ETaCm5QuA7i1V2c0FPpljZ052JHZAQsDpbIYd11HREvm +QQKCAQEA0W7xNYoFvnyikdG0t266LLv1EkWDFdgkelGx/eGe/JZ+au3uTM94EssC +ni64XX2P8vK/te+c3jItYO4MRgnDJ7GW+bRnJFu2kBE0W4chx7vga0XApVCP+Ugg +owv5yf9cOAHFulvPefsU0snYStD3gNq77XDg0CwoyUkpeq+GiupoQ8tquMSsrEwp +ve5DtDip3cLHz2oVLB3mR4kKVwVwmOgO5RKq6N/H6Jxtf/Zk1I260dKr+Dv2MnDh +dysO4zH5YEt2ML3oY4zY8lu+I5bHCBR1updSny0B31WrXAJyfZpMx+HOwETFKa3B +v9AGKz0Jc2GOIRKHrCQ/WkZePetaYQ== +-----END PRIVATE KEY----- diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt new file mode 100644 index 000000000..a817c0c74 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageExtensions.kt @@ -0,0 +1,32 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message + +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.RecordHeader +import java.nio.charset.Charset + + +fun <T : Headers> T?.toMap(): MutableMap<String, String> { + val map: MutableMap<String, String> = hashMapOf() + this?.forEach { map[it.key()] = String(it.value(), Charset.defaultCharset()) } + return map +} + +fun Headers.addHeader(key: String, value: String) { + this.add(RecordHeader(key, value.toByteArray())) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index 1cd8a2af7..184e85b70 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,8 +34,10 @@ open class MessageConsumerProperties open class KafkaMessageConsumerProperties : MessageConsumerProperties() { lateinit var bootstrapServers: String lateinit var groupId: String - var clientId: String? = null + lateinit var clientId: String var topic: String? = null + var autoCommit: Boolean = true + var autoOffsetReset: String = "latest" var pollMillSec: Long = 1000 var pollRecords: Int = -1 } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index 25f0bf44d..8bcc7580a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,47 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException + +/** Consumer Function Interfaces */ +interface ConsumerFunction interface BlueprintMessageConsumerService { + suspend fun subscribe(): Channel<String> { + return subscribe(null) + } + /** Subscribe to the Kafka channel with [additionalConfig] */ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + /** Consume and execute dynamic function [consumerFunction] */ + suspend fun consume(consumerFunction: ConsumerFunction) { + consume(null, consumerFunction) + } + + /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + + /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */ + suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + throw BluePrintProcessorException("Not Implemented") + } + /** close the channel, consumer and other resources */ suspend fun shutDown() - +} +/** Consumer dynamic implementation interface */ +interface KafkaConsumerRecordsFunction : ConsumerFunction { + suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>, + consumerRecords: ConsumerRecords<*, *>) }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index e33d41c09..7d8138639 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean = runBlocking { - sendMessageNB(message) + fun sendMessage(message: Any): Boolean { + return sendMessage(message = message, headers = null) } - fun sendMessage(topic: String, message: Any): Boolean = runBlocking { - sendMessageNB(topic, message) + fun sendMessage(topic: String, message: Any): Boolean { + return sendMessage(topic, message, null) } - suspend fun sendMessageNB(message: Any): Boolean + fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(message = message, headers = headers) + } + + fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { + sendMessageNB(topic, message, headers) + } + + suspend fun sendMessageNB(message: Any): Boolean { + return sendMessageNB(message = message, headers = null) + } + + suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean + + suspend fun sendMessageNB(topic: String, message: Any): Boolean { + return sendMessageNB(topic, message, null) + } - suspend fun sendMessageNB(topic: String, message: Any): Boolean + suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index b5d444a49..757846c81 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,33 +25,39 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger +import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread -class KafkaBasicAuthMessageConsumerService( +open class KafkaBasicAuthMessageConsumerService( private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties) : BlueprintMessageConsumerService { - private val channel = Channel<String>() - private var kafkaConsumer: Consumer<String, String>? = null val log = logger(KafkaBasicAuthMessageConsumerService::class) + val channel = Channel<String>() + var kafkaConsumer: Consumer<String, ByteArray>? = null @Volatile var keepGoing = true - fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> { + fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> { val configProperties = hashMapOf<String, Any>() configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId - configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest" + configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit + /** + * earliest: automatically reset the offset to the earliest offset + * latest: automatically reset the offset to the latest offset + */ + configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - if (messageConsumerProperties.clientId != null) { - configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! - } + configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java + configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId + /** To handle Back pressure, Get only configured record for processing */ if (messageConsumerProperties.pollRecords > 0) { configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords @@ -70,7 +77,7 @@ class KafkaBasicAuthMessageConsumerService( } - override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -80,22 +87,22 @@ class KafkaBasicAuthMessageConsumerService( "topics(${messageConsumerProperties.bootstrapServers})" } - kafkaConsumer!!.subscribe(consumerTopic) - log.info("Successfully consumed topic($consumerTopic)") + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") - thread(start = true, name = "KafkaConsumer") { + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { keepGoing = true kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) - log.info("Consumed Records : ${consumerRecords.count()}") + log.trace("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ consumerRecord.value()?.let { launch { if (!channel.isClosedForSend) { - channel.send(it) + channel.send(String(it, Charset.defaultCharset())) } else { log.error("Channel is closed to receive message") } @@ -110,6 +117,46 @@ class KafkaBasicAuthMessageConsumerService( return channel } + override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) { + /** get to topic names */ + val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } + check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } + return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction) + } + + override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?, + consumerFunction: ConsumerFunction) { + + val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction + + /** Create Kafka consumer */ + kafkaConsumer = kafkaConsumer(additionalConfig) + + checkNotNull(kafkaConsumer) { + "failed to create kafka consumer for " + + "server(${messageConsumerProperties.bootstrapServers})'s " + + "topics(${messageConsumerProperties.bootstrapServers})" + } + + kafkaConsumer!!.subscribe(topics) + log.info("Successfully consumed topic($topics)") + + thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") { + keepGoing = true + kafkaConsumer!!.use { kc -> + while (keepGoing) { + val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.trace("Consumed Records : ${consumerRecords.count()}") + runBlocking { + /** Execute dynamic consumer Block substitution */ + kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords) + } + } + log.info("message listener shutting down.....") + } + } + } + override suspend fun shutDown() { /** stop the polling loop */ keepGoing = false diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt index 1c93bb0fc..42adcd712 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +18,18 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import org.apache.commons.lang.builder.ToStringBuilder +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerConfig.* +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties -import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.SendResult -import org.springframework.util.concurrent.ListenableFutureCallback +import java.nio.charset.Charset class KafkaBasicAuthMessageProducerService( private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties) @@ -34,42 +37,46 @@ class KafkaBasicAuthMessageProducerService( private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!! - private var kafkaTemplate: KafkaTemplate<String, Any>? = null + private var kafkaProducer: KafkaProducer<String, ByteArray>? = null + + private val messageLoggerService = MessageLoggerService() override suspend fun sendMessageNB(message: Any): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessage(messageProducerProperties.topic!!, message) + return sendMessageNB(messageProducerProperties.topic!!, message) } - override suspend fun sendMessageNB(topic: String, message: Any): Boolean { - val serializedMessage = when (message) { - is String -> { - message - } - else -> { - message.asJsonType().toString() - } - } - val future = messageTemplate().send(topic, serializedMessage) + override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { + checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } + return sendMessageNB(messageProducerProperties.topic!!, message, headers) + } - future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> { - override fun onSuccess(result: SendResult<String, Any>) { - log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]") - } + override suspend fun sendMessageNB(topic: String, message: Any, + headers: MutableMap<String, String>?): Boolean { + val byteArrayMessage = when (message) { + is String -> message.toByteArray(Charset.defaultCharset()) + else -> message.asJsonString().toByteArray(Charset.defaultCharset()) + } - override fun onFailure(ex: Throwable) { - log.error("Unable to send message", ex) - } - }) + val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + val recordHeaders = record.headers() + messageLoggerService.messageProducing(recordHeaders) + headers?.let { + headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) } + } + val callback = Callback { metadata, exception -> + log.info("message published offset(${metadata.offset()}, headers :$headers )") + } + messageTemplate().send(record, callback).get() return true } - private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> { - log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") + fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> { + log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}") val configProps = hashMapOf<String, Any>() configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java if (messageProducerProperties.clientId != null) { configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!! } @@ -79,14 +86,11 @@ class KafkaBasicAuthMessageProducerService( if (additionalConfig != null) { configProps.putAll(additionalConfig) } - return DefaultKafkaProducerFactory(configProps) - } - fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> { - if (kafkaTemplate == null) { - kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig)) + if (kafkaProducer == null) { + kafkaProducer = KafkaProducer(configProps) } - return kafkaTemplate!! + return kafkaProducer!! } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt new file mode 100644 index 000000000..21bf1b76c --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerService.kt @@ -0,0 +1,88 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.addHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import java.net.InetAddress +import java.time.Instant +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* + +class MessageLoggerService { + + private val log = logger(MessageLoggerService::class) + + fun messageConsuming(headers: CommonHeader, consumerRecord: ConsumerRecord<*, *>) { + messageConsuming(headers.requestId, headers.subRequestId, + headers.originatorId, consumerRecord) + } + + fun messageConsuming(consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val requestID = headers[BluePrintConstants.ONAP_REQUEST_ID].defaultToUUID() + val invocationID = headers[BluePrintConstants.ONAP_INVOCATION_ID].defaultToUUID() + val partnerName = headers[BluePrintConstants.ONAP_PARTNER_NAME] ?: "UNKNOWN" + messageConsuming(requestID, invocationID, partnerName, consumerRecord) + } + + + fun messageConsuming(requestID: String, invocationID: String, partnerName: String, + consumerRecord: ConsumerRecord<*, *>) { + val headers = consumerRecord.headers().toMap() + val localhost = InetAddress.getLocalHost() + MDC.put("InvokeTimestamp", ZonedDateTime + .ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", headers["ClientIPAddress"].defaultToEmpty()) + MDC.put("ServerFQDN", localhost.hostName.defaultToEmpty()) + MDC.put("ServiceName", consumerRecord.topic()) + // Custom MDC for Message Consumers + MDC.put("Offset", consumerRecord.offset().toString()) + MDC.put("MessageKey", consumerRecord.key()?.toString().defaultToEmpty()) + log.info("Consuming MDC Properties : ${MDC.getCopyOfContextMap()}") + } + + /** Used before producing message request, Inbound Invocation ID is used as request Id + * for produced message Request, If invocation Id is missing then default Request Id will be generated. + */ + fun messageProducing(requestHeader: Headers) { + val localhost = InetAddress.getLocalHost() + requestHeader.addHeader(BluePrintConstants.ONAP_REQUEST_ID, MDC.get("InvocationID").defaultToUUID()) + requestHeader.addHeader(BluePrintConstants.ONAP_INVOCATION_ID, UUID.randomUUID().toString()) + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + requestHeader.addHeader(BluePrintConstants.ONAP_PARTNER_NAME, partnerName) + requestHeader.addHeader("ClientIPAddress", localhost.hostAddress) + } + + fun messageConsumingExisting() { + MDC.clear() + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index f4e85a94b..bdceec7d3 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -1,5 +1,6 @@ /* * Copyright © 2019 IBM. + * Modifications Copyright © 2018-2019 AT&T Intellectual Property. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.MockConsumer -import org.apache.kafka.clients.consumer.OffsetResetStrategy +import org.apache.kafka.clients.consumer.* import org.apache.kafka.common.TopicPartition import org.junit.Test import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration +import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.annotation.DirtiesContext @@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest { partitionsEndMap[partition] = records topicsCollection.add(partition.topic()) } - val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) mockKafkaConsumer.subscribe(topicsCollection) mockKafkaConsumer.rebalance(partitions) mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) mockKafkaConsumer.updateEndOffsets(partitionsEndMap) for (i in 1..10) { - val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i", - "I am message $i") + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) mockKafkaConsumer.addRecord(record) } @@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest { } } + @Test + fun testKafkaBasicAuthConsumerWithDynamicFunction() { + runBlocking { + val blueprintMessageConsumerService = bluePrintMessageLibPropertyService + .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService + assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService") + + val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true) + + val topic = "default-topic" + val partitions: MutableList<TopicPartition> = arrayListOf() + val topicsCollection: MutableList<String> = arrayListOf() + partitions.add(TopicPartition(topic, 1)) + val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf() + val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf() + + val records: Long = 10 + partitions.forEach { partition -> + partitionsBeginningMap[partition] = 0L + partitionsEndMap[partition] = records + topicsCollection.add(partition.topic()) + } + val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST) + mockKafkaConsumer.subscribe(topicsCollection) + mockKafkaConsumer.rebalance(partitions) + mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap) + mockKafkaConsumer.updateEndOffsets(partitionsEndMap) + for (i in 1..10) { + val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i", + "I am message $i".toByteArray()) + mockKafkaConsumer.addRecord(record) + } + + every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer + /** Test Consumer Function implementation */ + val consumerFunction = object : KafkaConsumerRecordsFunction { + override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, + consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) { + val count = consumerRecords.count() + log.trace("Received Message count($count)") + } + } + spyBlueprintMessageConsumerService.consume(consumerFunction) + delay(10) + spyBlueprintMessageConsumerService.shutDown() + } + } + /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */ //@Test fun testKafkaIntegration() { @@ -131,7 +179,10 @@ open class BlueprintMessageConsumerServiceTest { launch { repeat(5) { delay(100) - blueprintMessageProducerService.sendMessage("this is my message($it)") + val headers: MutableMap<String, String> = hashMapOf() + headers["id"] = it.toString() + blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)", + headers = headers) } } delay(5000) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 31bcc1517..f23624f7a 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -20,18 +20,18 @@ import io.mockk.every import io.mockk.mockk import io.mockk.spyk import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.RecordMetadata import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration import org.springframework.beans.factory.annotation.Autowired -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.SendResult import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner -import org.springframework.util.concurrent.SettableListenableFuture +import java.util.concurrent.Future import kotlin.test.Test import kotlin.test.assertTrue @@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest { val blueprintMessageProducerService = bluePrintMessageLibPropertyService .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService - val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>() + val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>() - val future = SettableListenableFuture<SendResult<String, Any>>() - //future.setException(BluePrintException("failed sending")) + val responseMock = mockk<Future<RecordMetadata>>() + every { responseMock.get() } returns mockk() - every { mockKafkaTemplate.send(any(), any()) } returns future + every { mockKafkaTemplate.send(any(), any()) } returns responseMock val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt new file mode 100644 index 000000000..82e40efd1 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MessageLoggerServiceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.message.service + +import io.mockk.every +import io.mockk.mockk +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader +import org.onap.ccsdk.cds.blueprintsprocessor.message.toMap +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.slf4j.MDC +import kotlin.test.assertEquals + +class MessageLoggerServiceTest { + + + @Test + fun testMessagingHeaders() { + val messageLoggerService = MessageLoggerService() + val commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + + val consumerRecord = mockk<ConsumerRecord<*, *>>() + every { consumerRecord.headers() } returns null + every { consumerRecord.key() } returns "1234" + every { consumerRecord.offset() } returns 12345 + every { consumerRecord.topic() } returns "sample-topic" + every { consumerRecord.timestamp() } returns System.currentTimeMillis() + messageLoggerService.messageConsuming(commonHeader, consumerRecord) + assertEquals(commonHeader.requestId, MDC.get("RequestID")) + assertEquals(commonHeader.subRequestId, MDC.get("InvocationID")) + + val mockHeaders = RecordHeaders() + messageLoggerService.messageProducing(mockHeaders) + val map = mockHeaders.toMap() + assertEquals("1234-12", map[BluePrintConstants.ONAP_REQUEST_ID]) + + messageLoggerService.messageConsumingExisting() + + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml index 3868440c7..820041f74 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml @@ -1,5 +1,6 @@ <!-- ~ Copyright © 2019 IBM. + ~ Modifications Copyright © 2018-2019 AT&T Intellectual Property. ~ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -15,11 +16,18 @@ --> <configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <property name="testing" + value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern> + <pattern>${localPattern}</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt deleted file mode 100644 index cdf6ce195..000000000 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright © 2018-2019 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.ccsdk.cds.blueprintsprocessor.core - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactor.ReactorContext -import kotlinx.coroutines.reactor.asCoroutineContext -import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine -import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext -import reactor.core.publisher.Mono -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ -@UseExperimental(InternalCoroutinesApi::class) -fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> - - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) - ?: sink.currentContext()).asCoroutineContext() - /** Populate MDC context only if present in Reactor Context */ - val newContext = if (!reactorContext.context.isEmpty - && reactorContext.context.hasKey(MDCContext)) { - val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) - GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) - } else GlobalScope.newCoroutineContext(context + reactorContext) - - val coroutine = MonoMDCCoroutine(newContext, sink) - sink.onDispose(coroutine) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt index 2f9ea4a25..d63f34ced 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintRemoteProcessorData.kt @@ -39,7 +39,8 @@ data class RemoteScriptExecutionInput(var requestId: String, data class RemoteScriptExecutionOutput(var requestId: String, var response: List<String>, var status: StatusType = StatusType.SUCCESS, - var timestamp: Date = Date()) + var timestamp: Date = Date(), + var payload: JsonNode) data class PrepareRemoteEnvInput(var requestId: String, var correlationId: String? = null, diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt index 4da7dcd0e..cec11ae3c 100644 --- a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt +++ b/ms/blueprintsprocessor/modules/commons/rest-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/rest/service/RestLoggerService.kt @@ -14,43 +14,47 @@ * limitations under the License. */ -package org.onap.ccsdk.cds.blueprintsprocessor.core.service +package org.onap.ccsdk.cds.blueprintsprocessor.rest.service -import kotlinx.coroutines.AbstractCoroutine -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.handleCoroutineException +import kotlinx.coroutines.* +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.asCoroutineContext +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_INVOCATION_ID +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_PARTNER_NAME +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants.ONAP_REQUEST_ID +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToEmpty +import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.slf4j.MDC import org.springframework.http.server.reactive.ServerHttpRequest import org.springframework.http.server.reactive.ServerHttpResponse import reactor.core.Disposable +import reactor.core.publisher.Mono import reactor.core.publisher.MonoSink +import java.net.InetAddress import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter -import java.util.* import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext -class LoggingService { - private val log = logger(LoggingService::class) +class RestLoggerService { + private val log = logger(RestLoggerService::class) - companion object { - const val ONAP_REQUEST_ID = "X-ONAP-RequestID" - const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID" - const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName" - } fun entering(request: ServerHttpRequest) { + val localhost = InetAddress.getLocalHost() val headers = request.headers - val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID)) - val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID)) - val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME)) + val requestID = headers.getFirst(ONAP_REQUEST_ID).defaultToUUID() + val invocationID = headers.getFirst(ONAP_INVOCATION_ID).defaultToUUID() + val partnerName = headers.getFirst(ONAP_PARTNER_NAME).defaultToEmpty() MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) MDC.put("RequestID", requestID) MDC.put("InvocationID", invocationID) MDC.put("PartnerName", partnerName) - MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress)) - MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString)) + MDC.put("ClientIPAddress", request.remoteAddress?.address?.hostAddress.defaultToEmpty()) + MDC.put("ServerFQDN",localhost.hostName.defaultToEmpty()) if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) { MDC.put("ServiceName", request.uri.path) } @@ -62,22 +66,35 @@ class LoggingService { val resHeaders = response.headers resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID") resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID") + val partnerName = System.getProperty("APPNAME") ?: "BlueprintsProcessor" + resHeaders[ONAP_PARTNER_NAME] = partnerName } catch (e: Exception) { log.warn("couldn't set response headers", e) } finally { MDC.clear() } } +} - private fun defaultToEmpty(input: Any?): String { - return input?.toString() ?: "" - } - private fun defaultToUUID(input: String?): String { - return input ?: UUID.randomUUID().toString() - } -} +/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ +@UseExperimental(InternalCoroutinesApi::class) +fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> + + val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) + ?: sink.currentContext()).asCoroutineContext() + /** Populate MDC context only if present in Reactor Context */ + val newContext = if (!reactorContext.context.isEmpty + && reactorContext.context.hasKey(MDCContext)) { + val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) + GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) + } else GlobalScope.newCoroutineContext(context + reactorContext) + val coroutine = MonoMDCCoroutine(newContext, sink) + sink.onDispose(coroutine) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) +} @InternalCoroutinesApi class MonoMDCCoroutine<in T>( |