From 49f43c856c8ca793bc6972d9d4b47c2d0d4c0816 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 3 Apr 2019 15:48:28 +0200 Subject: Creation of server module Issue-ID: DCAEGEN2-1390 Change-Id: I07410b16ed6566b933d5f1efa35bddb965225794 Signed-off-by: kjaniak Signed-off-by: Filip Krzywka --- sources/hv-collector-core/pom.xml | 28 ++-- .../dcae/collectors/veshv/boundary/adapters.kt | 2 +- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 6 +- .../veshv/factory/HvVesCollectorFactory.kt | 2 +- .../dcae/collectors/veshv/factory/ServerFactory.kt | 48 ------ .../dcae/collectors/veshv/impl/HvVesCollector.kt | 4 +- .../org/onap/dcae/collectors/veshv/impl/Router.kt | 2 +- .../veshv/impl/adapters/ClientContextLogging.kt | 47 ------ .../collectors/veshv/impl/adapters/HttpAdapter.kt | 2 +- .../veshv/impl/adapters/kafka/KafkaPublisher.kt | 8 +- .../veshv/impl/adapters/kafka/KafkaSinkFactory.kt | 4 +- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 160 -------------------- .../collectors/veshv/impl/socket/networking.kt | 81 ---------- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 6 +- .../dcae/collectors/veshv/model/ClientContext.kt | 61 -------- .../dcae/collectors/veshv/model/ServiceContext.kt | 45 ------ .../onap/dcae/collectors/veshv/impl/RouterTest.kt | 2 +- .../veshv/impl/wire/WireChunkDecoderTest.kt | 2 +- .../collectors/veshv/model/ClientContextTest.kt | 98 ------------- .../collectors/veshv/model/ServiceContextTest.kt | 67 --------- .../dcae/collectors/veshv/tests/component/Sut.kt | 2 +- sources/hv-collector-domain/pom.xml | 14 +- .../veshv/domain/logging/ClientContext.kt | 60 ++++++++ .../veshv/domain/logging/ClientContextLogging.kt | 46 ++++++ .../dcae/collectors/veshv/domain/logging/Marker.kt | 46 ++++++ .../veshv/domain/logging/MarkerLogging.kt | 63 ++++++++ .../collectors/veshv/domain/logging/OnapMdc.kt | 35 +++++ .../veshv/domain/logging/ServiceContext.kt | 44 ++++++ .../veshv/domain/logging/ClientContextTest.kt | 94 ++++++++++++ .../veshv/domain/logging/ServiceContextTest.kt | 66 +++++++++ .../src/test/resources/logback-test.xml | 35 +++++ .../org.mockito.plugins.MockMaker | 1 + sources/hv-collector-main/pom.xml | 4 +- .../org/onap/dcae/collectors/veshv/main/main.kt | 13 +- .../veshv/main/servers/HealthCheckServer.kt | 2 +- .../collectors/veshv/main/servers/VesServer.kt | 69 --------- sources/hv-collector-server/pom.xml | 90 ++++++++++++ .../org/onap/dcae/collectors/veshv/api/Server.kt | 41 ++++++ .../onap/dcae/collectors/veshv/impl/HvVesServer.kt | 67 +++++++++ .../dcae/collectors/veshv/impl/NettyTcpServer.kt | 163 +++++++++++++++++++++ .../onap/dcae/collectors/veshv/impl/networking.kt | 81 ++++++++++ sources/hv-collector-utils/pom.xml | 10 +- .../dcae/collectors/veshv/utils/logging/Logger.kt | 95 +++--------- .../dcae/collectors/veshv/utils/logging/Marker.kt | 46 ------ .../dcae/collectors/veshv/utils/logging/OnapMdc.kt | 35 ----- sources/pom.xml | 1 + 46 files changed, 1019 insertions(+), 879 deletions(-) delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt delete mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt delete mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt create mode 100644 sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt create mode 100644 sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt create mode 100644 sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt create mode 100644 sources/hv-collector-domain/src/test/resources/logback-test.xml create mode 100644 sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker delete mode 100644 sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt create mode 100644 sources/hv-collector-server/pom.xml create mode 100644 sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt create mode 100644 sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt create mode 100644 sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt create mode 100644 sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt delete mode 100644 sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt delete mode 100644 sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt (limited to 'sources') diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e1e35d8b..122de173 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -59,31 +59,25 @@ - ${project.parent.groupId} + ${project.groupId} hv-collector-configuration - ${project.parent.version} + ${project.version} - ${project.parent.groupId} + ${project.groupId} hv-collector-domain - ${project.parent.version} + ${project.version} compile - ${project.parent.groupId} + ${project.groupId} hv-collector-utils - ${project.parent.version} + ${project.version} - ${project.parent.groupId} + ${project.groupId} hv-collector-ssl - ${project.parent.version} - - - ${project.parent.groupId} - hv-collector-test-utils - ${project.parent.version} - test + ${project.version} io.arrow-kt @@ -105,6 +99,12 @@ io.projectreactor.kafka reactor-kafka + + ${project.groupId} + hv-collector-test-utils + ${project.version} + test + diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 48f335a1..28b28203 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 4c54d7d2..23a5d376 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -20,9 +20,8 @@ package org.onap.dcae.collectors.veshv.boundary import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,6 +33,3 @@ interface CollectorFactory : Closeable { operator fun invoke(ctx: ClientContext): Collector } -interface Server { - fun start(): Mono -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt index c3c5d733..1f221c60 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext /** * @author Piotr Jaszczyk diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt deleted file mode 100644 index e0f611b6..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.CollectorFactory -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -object ServerFactory { - - private val sslFactory = SslContextFactory() - - fun createNettyTcpServer(serverConfig: ServerConfiguration, - securityConfig: SecurityConfiguration, - collectorFactory: CollectorFactory, - metrics: Metrics - ): Server = NettyTcpServer( - serverConfig, - sslFactory.createServerContext(securityConfig), - collectorFactory, - metrics - ) -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index 7d8f0cb1..ac7c3917 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -23,9 +23,9 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index fec713ad..2190eba3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -26,9 +26,9 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt deleted file mode 100644 index 954de978..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError -import reactor.core.publisher.Flux - -@Suppress("TooManyFunctions") -internal object ClientContextLogging { - fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block) - fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block) - fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block) - fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block) - fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block) - - fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message) - fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message) - fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message) - fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message) - fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message) - - fun Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, - returnFlux: Flux = Flux.empty()): Flux { - return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux) - } -} - diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 8d154091..8f66de2b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import org.onap.dcae.collectors.veshv.domain.logging.OnapMdc import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient import java.util.* diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 7b726ab4..91e6fde5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -22,14 +22,16 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.withDebug +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.trace +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt index 9df1af31..2973fa8d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt @@ -22,8 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt deleted file mode 100644 index 7ce86f98..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.socket - -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.handler.ssl.SslContext -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorFactory -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.NettyServerHandle -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker -import reactor.core.publisher.Mono -import reactor.netty.Connection -import reactor.netty.NettyInbound -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpServer -import java.net.InetAddress -import java.net.InetSocketAddress -import java.time.Duration - - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, - private val sslContext: Option, - private val collectorFactory: CollectorFactory, - private val metrics: Metrics) : Server { - - override fun start(): Mono = - Mono.defer { - TcpServer.create() - .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } - .configureSsl() - .handle(this::handleConnection) - .bind() - .map { - NettyServerHandle(it, closeAction()) - } - } - - private fun closeAction(): Mono = - collectorFactory.close().doOnSuccess { - logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } - } - - - private fun TcpServer.configureSsl() = - sslContext - .map { serverContext -> - logger.info { "Collector configured with SSL enabled" } - this.secure { it.sslContext(serverContext) } - }.getOrElse { - logger.info { "Collector configured with SSL disabled" } - this - } - - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = - messageHandlingStream(nettyInbound, nettyOutbound).run { - subscribe() - nettyOutbound.neverComplete() - } - - private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = - withNewClientContextFrom(nettyInbound, nettyOutbound) - { clientContext -> - logger.debug(clientContext::fullMdc) { "Client connection request received" } - - clientContext.clientAddress - .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } - .getOrElse { - logger.warn(clientContext::fullMdc) { - "Client address could not be resolved. Discarding connection" - } - nettyInbound.closeConnectionAndReturn(Mono.empty()) - } - } - - private fun acceptIfNotLocalConnection(address: InetAddress, - clientContext: ClientContext, - nettyInbound: NettyInbound): Mono = - if (address.isLocalClientAddress()) { - logger.debug(clientContext) { - "Client address resolved to localhost. Discarding connection as suspected healthcheck" - } - nettyInbound.closeConnectionAndReturn(Mono.empty()) - } else { - acceptClientConnection(clientContext, nettyInbound) - } - - private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { - metrics.notifyClientConnected() - logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } - val collector = collectorFactory(clientContext) - return collector.handleClient(clientContext, nettyInbound) - } - - private fun Collector.handleClient(clientContext: ClientContext, - nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - connection - .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) - .logConnectionClosed(clientContext) - }.run { - handleConnection(nettyInbound.createDataStream()) - } - - private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = - onReadIdle(timeout.toMillis()) { - logger.info(ctx) { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." - } - disconnectClient(ctx) - } - - private fun Connection.disconnectClient(ctx: ClientContext) = - closeChannelAndThen { - if (it.isSuccess) - logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." } - else - logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause()) - } - - private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = - onDispose { - metrics.notifyClientDisconnected() - logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } - } - - companion object { - private val logger = Logger(NettyTcpServer::class) - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt deleted file mode 100644 index a1e5b8fd..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.socket - -import arrow.core.Option -import arrow.core.Try -import arrow.syntax.collections.firstOption -import io.netty.handler.ssl.SslHandler -import io.netty.util.concurrent.Future -import org.onap.dcae.collectors.veshv.model.ClientContext -import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux -import reactor.netty.Connection -import reactor.netty.NettyInbound -import reactor.netty.NettyOutbound -import java.net.InetAddress -import java.security.cert.X509Certificate -import javax.net.ssl.SSLSession - -internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost" - -internal fun Connection.getSslSession(): Option = - Option.fromNullable( - channel() - .pipeline() - .get(SslHandler::class.java) - ?.engine() - ?.session - ) - -internal fun SSLSession.findClientCert(): Option = - peerCertificates - .firstOption() - .flatMap { Option.fromNullable(it as? X509Certificate) } - -internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) = - nettyInboud.withConnection(task) - -internal fun Connection.closeChannel() = channel().close() - -internal fun Connection.closeChannelAndThen(task: (Future) -> Unit) = - closeChannel().addListener { task(it) } - -internal fun NettyInbound.closeConnectionAndReturn(returnValue: T): T = - withConnectionFrom(this) { it.closeChannel() }.let { returnValue } - -internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain() - -// -// ClientContext related -// - -internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound, - nettyOutbound: NettyOutbound, - reactiveTask: (ClientContext) -> Mono) = - ClientContext(nettyOutbound.alloc()) - .also { populateClientContextFromInbound(it, nettyInbound) } - .run(reactiveTask) - -internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) = - withConnectionFrom(nettyInbound) { connection -> - clientContext.clientAddress = Try { connection.address().address }.toOption() - clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } - } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index ca9d28ae..0d0f8ea7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -25,9 +25,9 @@ import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt deleted file mode 100644 index 7b082e64..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import arrow.core.None -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc -import java.net.InetAddress -import java.security.cert.X509Certificate -import java.util.* - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -data class ClientContext( - val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, - var clientAddress: Option = None, - var clientCert: Option = None, - val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP - val invocationId: String = UUID.randomUUID().toString()) { - - val mdc: Map - get() = mapOf( - OnapMdc.REQUEST_ID to requestId, - OnapMdc.INVOCATION_ID to invocationId, - OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE, - OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE }, - OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE } - ) - - val fullMdc: Map - get() = mdc + ServiceContext.mdc - - private fun clientDn(): Option = clientCert.map { it.subjectX500Principal.toString() } - private fun clientIp(): Option = clientAddress.map(InetAddress::getHostAddress) - - companion object { - const val DEFAULT_STATUS_CODE = "INPROGRESS" - const val DEFAULT_VALUE = "" - } -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt deleted file mode 100644 index a72ec034..00000000 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc -import java.net.InetAddress -import java.net.UnknownHostException -import java.util.* - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -object ServiceContext { - val instanceId = UUID.randomUUID().toString() - val serverFqdn = getHost().hostName!! - - val mdc = mapOf( - OnapMdc.INSTANCE_ID to instanceId, - OnapMdc.SERVER_FQDN to serverFqdn - ) - - private fun getHost() = try { - InetAddress.getLocalHost() - } catch (ex: UnknownHostException) { - InetAddress.getLoopbackAddress() - } -} diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 6b9c6803..533581d5 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -36,7 +36,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index e0092cf9..10dea82d 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,7 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import reactor.test.test /** diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt deleted file mode 100644 index a49428a7..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import arrow.core.Some -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc -import java.net.Inet4Address -import java.net.InetAddress -import java.net.InetSocketAddress -import java.security.cert.X509Certificate -import java.util.* -import javax.security.auth.x500.X500Principal - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -internal object ClientContextTest : Spek({ - describe("ClientContext") { - given("default instance") { - val cut = ClientContext() - - on("mapped diagnostic context") { - val mdc = cut.mdc - - it("should contain ${OnapMdc.REQUEST_ID}") { - assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId) - } - - it("should contain ${OnapMdc.INVOCATION_ID}") { - assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId) - } - - it("should contain ${OnapMdc.STATUS_CODE}") { - assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS") - } - - it("should contain ${OnapMdc.CLIENT_NAME}") { - assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank() - } - - it("should contain ${OnapMdc.CLIENT_IP}") { - assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank() - } - } - } - - given("instance with client data") { - val clientDn = "C=PL, O=Nokia, CN=NokiaBTS" - val clientIp = "192.168.52.34" - val cert: X509Certificate = mock() - val principal: X500Principal = mock() - val cut = ClientContext( - clientAddress = Some(InetAddress.getByName(clientIp)), - clientCert = Some(cert)) - - whenever(cert.subjectX500Principal).thenReturn(principal) - whenever(principal.toString()).thenReturn(clientDn) - - on("mapped diagnostic context") { - val mdc = cut.mdc - - it("should contain ${OnapMdc.CLIENT_NAME}") { - assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn) - } - - it("should contain ${OnapMdc.CLIENT_IP}") { - assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp) - } - } - } - } -}) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt deleted file mode 100644 index 5b6e4526..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc -import java.util.* - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -internal object ServiceContextTest : Spek({ - describe("ServiceContext") { - given("singleton instance") { - val cut = ServiceContext - - on("instanceId") { - val instanceId = cut.instanceId - it("should be valid UUID") { - UUID.fromString(instanceId) // should not throw - } - } - - on("serverFqdn") { - val serverFqdn = cut.serverFqdn - it("should be non empty") { - assertThat(serverFqdn).isNotBlank() - } - } - - on("mapped diagnostic context") { - val mdc = cut.mdc - - it("should contain ${OnapMdc.INSTANCE_ID}") { - assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId) - } - - it("should contain ${OnapMdc.SERVER_FQDN}") { - assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn) - } - } - } - } -}) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 4e9b7ef4..93c71e5e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml index 40e7c936..d68fa65b 100644 --- a/sources/hv-collector-domain/pom.xml +++ b/sources/hv-collector-domain/pom.xml @@ -57,6 +57,11 @@ + + ${project.groupId} + hv-collector-utils + ${project.version} + org.onap.dcaegen2.services.sdk hvvesclient-protobuf @@ -65,6 +70,10 @@ org.jetbrains.kotlin kotlin-stdlib-jdk8 + + org.slf4j + slf4j-api + io.projectreactor.netty reactor-netty @@ -80,7 +89,6 @@ org.jetbrains.kotlin kotlin-test - test org.jetbrains.spek @@ -90,6 +98,10 @@ org.jetbrains.spek spek-junit-platform-engine + + com.nhaarman.mockitokotlin2 + mockito-kotlin + diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt new file mode 100644 index 00000000..6a47f44d --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse +import io.netty.buffer.ByteBufAllocator +import java.net.InetAddress +import java.security.cert.X509Certificate +import java.util.* + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +data class ClientContext( + val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT, + var clientAddress: Option = None, + var clientCert: Option = None, + val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP + val invocationId: String = UUID.randomUUID().toString()) { + + val mdc: Map + get() = mapOf( + OnapMdc.REQUEST_ID to requestId, + OnapMdc.INVOCATION_ID to invocationId, + OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE, + OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE }, + OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE } + ) + + val fullMdc: Map + get() = mdc + ServiceContext.mdc + + private fun clientDn(): Option = clientCert.map { it.subjectX500Principal.toString() } + private fun clientIp(): Option = clientAddress.map(InetAddress::getHostAddress) + + companion object { + const val DEFAULT_STATUS_CODE = "INPROGRESS" + const val DEFAULT_VALUE = "" + } +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt new file mode 100644 index 00000000..fc45ea9d --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError +import reactor.core.publisher.Flux + +@Suppress("TooManyFunctions") +object ClientContextLogging { + fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block) + fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block) + fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block) + fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block) + fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block) + + fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message) + fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message) + fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message) + fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message) + fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message) + + fun Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable, + returnFlux: Flux = Flux.empty()): Flux { + return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux) + } +} + diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt new file mode 100644 index 00000000..b9463c96 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import org.slf4j.MarkerFactory +import java.time.Instant +import java.util.* + +sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map = emptyMap()) { + + object Entry : Marker(ENTRY) + object Exit : Marker(EXIT) + + class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : + Marker(INVOKE, mdc(id, timestamp)) { + companion object { + private fun mdc(id: UUID, timestamp: Instant) = mapOf( + OnapMdc.INVOCATION_ID to id.toString(), + OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString() + ) + } + } + + companion object { + private val ENTRY = MarkerFactory.getMarker("ENTRY") + private val EXIT = MarkerFactory.getMarker("EXIT") + private val INVOKE = MarkerFactory.getMarker("INVOKE") + } +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt new file mode 100644 index 00000000..2959f98c --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.slf4j.MDC + + +@Suppress("TooManyFunctions") +object MarkerLogging { + fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } } + + fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } } + + fun Logger.info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withInfo(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withDebug(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withTrace(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + + private inline fun withAdditionalMdc(mdc: Map, block: () -> Unit) { + if (mdc.isEmpty()) { + block() + } else { + try { + mdc.forEach(MDC::put) + block() + } finally { + mdc.keys.forEach(MDC::remove) + } + } + } +} \ No newline at end of file diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt new file mode 100644 index 00000000..8c7feced --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +object OnapMdc { + const val REQUEST_ID = "RequestID" + const val CLIENT_NAME = "PartnerName" + const val CLIENT_IP = "ClientIPAddress" + const val INVOCATION_ID = "InvocationID" + const val INVOCATION_TIMESTAMP = "InvokeTimestamp" + const val STATUS_CODE = "StatusCode" + const val INSTANCE_ID = "InstanceID" + const val SERVER_FQDN = "ServerFQDN" +} diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt new file mode 100644 index 00000000..c3c64d92 --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import java.net.InetAddress +import java.net.UnknownHostException +import java.util.* + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +object ServiceContext { + val instanceId = UUID.randomUUID().toString() + val serverFqdn = getHost().hostName!! + + val mdc = mapOf( + OnapMdc.INSTANCE_ID to instanceId, + OnapMdc.SERVER_FQDN to serverFqdn + ) + + private fun getHost() = try { + InetAddress.getLocalHost() + } catch (ex: UnknownHostException) { + InetAddress.getLoopbackAddress() + } +} diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt new file mode 100644 index 00000000..ea1a2e90 --- /dev/null +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt @@ -0,0 +1,94 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import java.net.InetAddress +import java.security.cert.X509Certificate +import javax.security.auth.x500.X500Principal + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +internal object ClientContextTest : Spek({ + describe("ClientContext") { + given("default instance") { + val cut = ClientContext() + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.REQUEST_ID}") { + assertThat(mdc[OnapMdc.REQUEST_ID]).isEqualTo(cut.requestId) + } + + it("should contain ${OnapMdc.INVOCATION_ID}") { + assertThat(mdc[OnapMdc.INVOCATION_ID]).isEqualTo(cut.invocationId) + } + + it("should contain ${OnapMdc.STATUS_CODE}") { + assertThat(mdc[OnapMdc.STATUS_CODE]).isEqualTo("INPROGRESS") + } + + it("should contain ${OnapMdc.CLIENT_NAME}") { + assertThat(mdc[OnapMdc.CLIENT_NAME]).isBlank() + } + + it("should contain ${OnapMdc.CLIENT_IP}") { + assertThat(mdc[OnapMdc.CLIENT_IP]).isBlank() + } + } + } + + given("instance with client data") { + val clientDn = "C=PL, O=Nokia, CN=NokiaBTS" + val clientIp = "192.168.52.34" + val cert: X509Certificate = mock() + val principal: X500Principal = mock() + val cut = ClientContext( + clientAddress = Some(InetAddress.getByName(clientIp)), + clientCert = Some(cert)) + + whenever(cert.subjectX500Principal).thenReturn(principal) + whenever(principal.toString()).thenReturn(clientDn) + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.CLIENT_NAME}") { + assertThat(mdc[OnapMdc.CLIENT_NAME]).isEqualTo(clientDn) + } + + it("should contain ${OnapMdc.CLIENT_IP}") { + assertThat(mdc[OnapMdc.CLIENT_IP]).isEqualTo(clientIp) + } + } + } + } +}) diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt new file mode 100644 index 00000000..85ced42a --- /dev/null +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import java.util.* + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +internal object ServiceContextTest : Spek({ + describe("ServiceContext") { + given("singleton instance") { + val cut = ServiceContext + + on("instanceId") { + val instanceId = cut.instanceId + it("should be valid UUID") { + UUID.fromString(instanceId) // should not throw + } + } + + on("serverFqdn") { + val serverFqdn = cut.serverFqdn + it("should be non empty") { + assertThat(serverFqdn).isNotBlank() + } + } + + on("mapped diagnostic context") { + val mdc = cut.mdc + + it("should contain ${OnapMdc.INSTANCE_ID}") { + assertThat(mdc[OnapMdc.INSTANCE_ID]).isEqualTo(cut.instanceId) + } + + it("should contain ${OnapMdc.SERVER_FQDN}") { + assertThat(mdc[OnapMdc.SERVER_FQDN]).isEqualTo(cut.serverFqdn) + } + } + } + } +}) diff --git a/sources/hv-collector-domain/src/test/resources/logback-test.xml b/sources/hv-collector-domain/src/test/resources/logback-test.xml new file mode 100644 index 00000000..9a4eacfe --- /dev/null +++ b/sources/hv-collector-domain/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + + + + + + + ${FILE_LOG_PATTERN} + + ${LOG_FILE} + + ${LOG_FILE}.%d{yyyy-MM-dd}.log + 50MB + 30 + 10GB + + + + + + + + + + diff --git a/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index 3fe8932f..d99bf855 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -83,12 +83,12 @@ ${project.groupId} - hv-collector-core + hv-collector-health-check ${project.version} ${project.groupId} - hv-collector-health-check + hv-collector-server ${project.version} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dfb388d8..123d2dc9 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,14 +19,16 @@ */ package org.onap.dcae.collectors.veshv.main +import org.onap.dcae.collectors.veshv.api.ServersFactory import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer -import org.onap.dcae.collectors.veshv.main.servers.VesServer -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook @@ -41,6 +43,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference() private val configurationModule = ConfigurationModule() +private val sslContextFactory = SslContextFactory() private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array) { @@ -81,7 +84,11 @@ private fun startServer(config: HvVesConfiguration): Mono = private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - VesServer.start(config) + ServersFactory.createHvVesServer( + config, + sslContextFactory, + MicrometerMetrics.INSTANCE + ).start() } private fun stopRunningServer() = Mono.defer { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index c970e5c8..2ed6ea70 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.main.servers import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt deleted file mode 100644 index 98a094b2..00000000 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main.servers - -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.factory.AdapterFactory -import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk - * @since August 2018 - */ -object VesServer { - - private val logger = Logger(VesServer::class) - - fun start(config: HvVesConfiguration): Mono = - createVesServer(config) - .start() - .doOnNext(::logServerStarted) - - private fun createVesServer(config: HvVesConfiguration): Server = - createCollectorProvider(config) - .let { collectorProvider -> - ServerFactory.createNettyTcpServer( - config.server, - config.security, - collectorProvider, - MicrometerMetrics.INSTANCE - ) - } - - private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory = - HvVesCollectorFactory( - config.collector, - AdapterFactory.sinkCreatorFactory(), - MicrometerMetrics.INSTANCE - ) - - private fun logServerStarted(handle: ServerHandle) = - logger.info(ServiceContext::mdc) { - "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" - } - -} diff --git a/sources/hv-collector-server/pom.xml b/sources/hv-collector-server/pom.xml new file mode 100644 index 00000000..b8743450 --- /dev/null +++ b/sources/hv-collector-server/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + org.onap.dcaegen2.collectors.hv-ves + hv-collector-sources + 1.1.0-SNAPSHOT + .. + + + hv-collector-server + VES HighVolume Collector :: Server + + + + + kotlin-maven-plugin + org.jetbrains.kotlin + + + maven-surefire-plugin + org.apache.maven.plugins + + + org.jacoco + jacoco-maven-plugin + + + + + + + ${project.parent.groupId} + hv-collector-core + ${project.parent.version} + + + ${project.parent.groupId} + hv-collector-ssl + ${project.parent.version} + + + ${project.parent.groupId} + hv-collector-utils + ${project.parent.version} + + + io.projectreactor + reactor-core + + + io.projectreactor.addons + reactor-extra + + + io.projectreactor.netty + reactor-netty + + + + diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt new file mode 100644 index 00000000..2bfac8d8 --- /dev/null +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.api + +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.impl.HvVesServer +import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono + +interface Server { + fun start(): Mono +} + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +object ServersFactory { + fun createHvVesServer(config: HvVesConfiguration, + sslContextFactory: SslContextFactory, + metrics: Metrics): Server = HvVesServer(config, sslContextFactory, metrics) +} diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt new file mode 100644 index 00000000..0e149ab7 --- /dev/null +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.onap.dcae.collectors.veshv.api.Server +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory +import org.onap.dcae.collectors.veshv.factory.AdapterFactory +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +internal class HvVesServer(private val config: HvVesConfiguration, + private val sslFactory: SslContextFactory, + private val metrics: Metrics) : Server { + + private val logger = Logger(HvVesServer::class) + + override fun start(): Mono = + createNettyTcpServer(config) + .start() + .doOnNext(::logServerStarted) + + private fun createNettyTcpServer(config: HvVesConfiguration): Server = + NettyTcpServer( + config.server, + sslFactory.createServerContext(config.security), + createCollectorProvider(config), + metrics + ) + + private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory = + HvVesCollectorFactory( + config.collector, + AdapterFactory.sinkCreatorFactory(), + metrics + ) + + private fun logServerStarted(handle: ServerHandle) = + logger.info(ServiceContext::mdc) { + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" + } +} diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt new file mode 100644 index 00000000..d19b7f49 --- /dev/null +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt @@ -0,0 +1,163 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import arrow.core.Option +import arrow.core.getOrElse +import io.netty.handler.ssl.SslContext +import org.onap.dcae.collectors.veshv.api.Server +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.CollectorFactory +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.domain.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.debug +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.info +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.info +import reactor.core.publisher.Mono +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpServer +import java.net.InetAddress +import java.net.InetSocketAddress +import java.time.Duration + + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration, + private val sslContext: Option, + private val collectorFactory: CollectorFactory, + private val metrics: Metrics) : Server { + + override fun start(): Mono = + Mono.defer { + TcpServer.create() + .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) } + .configureSsl() + .handle(this::handleConnection) + .bind() + .map { + NettyServerHandle(it, closeAction()) + } + } + + private fun closeAction(): Mono = + collectorFactory.close().doOnSuccess { + logger.info(ServiceContext::mdc) { "Netty TCP Server closed" } + } + + + private fun TcpServer.configureSsl() = + sslContext + .map { serverContext -> + logger.info { "Collector configured with SSL enabled" } + this.secure { it.sslContext(serverContext) } + }.getOrElse { + logger.info { "Collector configured with SSL disabled" } + this + } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = + messageHandlingStream(nettyInbound, nettyOutbound).run { + subscribe() + nettyOutbound.neverComplete() + } + + private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = + withNewClientContextFrom(nettyInbound, nettyOutbound) + { clientContext -> + logger.debug(clientContext::fullMdc) { "Client connection request received" } + + clientContext.clientAddress + .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } + .getOrElse { + logger.warn(clientContext::fullMdc) { + "Client address could not be resolved. Discarding connection" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } + } + + private fun acceptIfNotLocalConnection(address: InetAddress, + clientContext: ClientContext, + nettyInbound: NettyInbound): Mono = + if (address.isLocalClientAddress()) { + logger.debug(clientContext) { + "Client address resolved to localhost. Discarding connection as suspected healthcheck" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } else { + acceptClientConnection(clientContext, nettyInbound) + } + + private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { + metrics.notifyClientConnected() + logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" } + val collector = collectorFactory(clientContext) + return collector.handleClient(clientContext, nettyInbound) + } + + private fun Collector.handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + connection + .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + handleConnection(nettyInbound.createDataStream()) + } + + private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = + onReadIdle(timeout.toMillis()) { + logger.info(ctx) { + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." + } + disconnectClient(ctx) + } + + private fun Connection.disconnectClient(ctx: ClientContext) = + closeChannelAndThen { + if (it.isSuccess) + logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." } + else + logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause()) + } + + private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = + onDispose { + metrics.notifyClientDisconnected() + logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" } + } + + companion object { + private val logger = Logger(NettyTcpServer::class) + } +} diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt new file mode 100644 index 00000000..eb51cf4b --- /dev/null +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import arrow.core.Option +import arrow.core.Try +import arrow.syntax.collections.firstOption +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.Future +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext +import reactor.core.publisher.Mono +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import java.net.InetAddress +import java.security.cert.X509Certificate +import javax.net.ssl.SSLSession + +internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost" + +internal fun Connection.getSslSession(): Option = + Option.fromNullable( + channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session + ) + +internal fun SSLSession.findClientCert(): Option = + peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + +internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) = + nettyInboud.withConnection(task) + +internal fun Connection.closeChannel() = channel().close() + +internal fun Connection.closeChannelAndThen(task: (Future) -> Unit) = + closeChannel().addListener { task(it) } + +internal fun NettyInbound.closeConnectionAndReturn(returnValue: T): T = + withConnectionFrom(this) { it.closeChannel() }.let { returnValue } + +internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain() + +// +// ClientContext related +// + +internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + reactiveTask: (ClientContext) -> Mono) = + ClientContext(nettyOutbound.alloc()) + .also { populateClientContextFromInbound(it, nettyInbound) } + .run(reactiveTask) + +internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + clientContext.clientAddress = Try { connection.address().address }.toOption() + clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } + } diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index 9dc8c9af..2e13e0a7 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -55,11 +55,6 @@ - - ${project.parent.groupId} - hv-collector-domain - ${project.parent.version} - org.jetbrains.kotlin kotlin-reflect @@ -80,6 +75,11 @@ io.arrow-kt arrow-syntax + + io.projectreactor.netty + reactor-netty + true + com.google.guava guava diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 7fcc73a0..14bc3ec0 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -23,6 +23,7 @@ import ch.qos.logback.classic.LoggerContext import kotlin.reflect.KClass import org.slf4j.LoggerFactory import org.slf4j.MDC +import org.slf4j.Marker typealias MappedDiagnosticContext = () -> Map @@ -52,91 +53,70 @@ class Logger(logger: org.slf4j.Logger) { fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block() fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - errorLogger.withMdc(mdc, block) + errorLogger.withMdc(mdc, block) fun error(message: () -> String) = errorLogger.run { log(message()) } fun error(mdc: MappedDiagnosticContext, message: () -> String) = - errorLogger.withMdc(mdc) { log(message()) } - - fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - errorLogger.withMdc(mdc) { log(marker, message()) } - - fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = - errorLogger.withMdc(mdc) { log(marker, message(), t) } + errorLogger.withMdc(mdc) { log(message()) } // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - warnLogger.withMdc(mdc, block) + warnLogger.withMdc(mdc, block) fun warn(message: () -> String) = warnLogger.run { log(message()) } fun warn(mdc: MappedDiagnosticContext, message: () -> String) = - warnLogger.withMdc(mdc) { log(message()) } - - fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - warnLogger.withMdc(mdc) { log(marker, message()) } - - fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = - warnLogger.withMdc(mdc) { log(marker, message(), t) } + warnLogger.withMdc(mdc) { log(message()) } // INFO fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - infoLogger.withMdc(mdc, block) + infoLogger.withMdc(mdc, block) fun info(message: () -> String) = infoLogger.run { log(message()) } fun info(mdc: MappedDiagnosticContext, message: () -> String) = - infoLogger.withMdc(mdc) { log(message()) } - - fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - infoLogger.withMdc(mdc) { log(marker, message()) } + infoLogger.withMdc(mdc) { log(message()) } // DEBUG fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - debugLogger.withMdc(mdc, block) + debugLogger.withMdc(mdc, block) fun debug(message: () -> String) = debugLogger.run { log(message()) } fun debug(mdc: MappedDiagnosticContext, message: () -> String) = - debugLogger.withMdc(mdc) { log(message()) } - - fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - debugLogger.withMdc(mdc) { log(marker, message()) } + debugLogger.withMdc(mdc) { log(message()) } // TRACE fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block() fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - traceLogger.withMdc(mdc, block) + traceLogger.withMdc(mdc, block) fun trace(message: () -> String) = traceLogger.run { log(message()) } fun trace(mdc: MappedDiagnosticContext, message: () -> String) = - traceLogger.withMdc(mdc) { log(message()) } - - fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - traceLogger.withMdc(mdc) { log(marker, message()) } + traceLogger.withMdc(mdc) { log(message()) } companion object { fun setLogLevel(packageName: String, level: LogLevel) { @@ -165,19 +145,6 @@ abstract class AtLevelLogger { } } } - - protected fun withAdditionalMdc(mdc: Map, block: () -> Unit) { - if (mdc.isEmpty()) { - block() - } else { - try { - mdc.forEach(MDC::put) - block() - } finally { - mdc.keys.forEach(MDC::remove) - } - } - } } object OffLevelLogger : AtLevelLogger() { @@ -211,14 +178,10 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.error(marker.slf4jMarker, message) - } + logger.error(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.error(marker.slf4jMarker, message, t) - } + logger.error(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -232,14 +195,10 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.warn(marker.slf4jMarker, message) - } + logger.warn(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.warn(marker.slf4jMarker, message, t) - } + logger.warn(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -253,14 +212,10 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.info(marker.slf4jMarker, message) - } + logger.info(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.info(marker.slf4jMarker, message, t) - } + logger.info(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -274,14 +229,10 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.debug(marker.slf4jMarker, message) - } + logger.debug(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.debug(marker.slf4jMarker, message, t) - } + logger.debug(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -295,12 +246,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.trace(marker.slf4jMarker, message) - } + logger.trace(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.trace(marker.slf4jMarker, message, t) - } + logger.trace(marker, message, t) } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt deleted file mode 100644 index ac39100d..00000000 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.utils.logging - -import org.slf4j.MarkerFactory -import java.time.Instant -import java.util.* - -sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map = emptyMap()) { - - object Entry : Marker(ENTRY) - object Exit : Marker(EXIT) - - class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : - Marker(INVOKE, mdc(id, timestamp)) { - companion object { - private fun mdc(id: UUID, timestamp: Instant) = mapOf( - OnapMdc.INVOCATION_ID to id.toString(), - OnapMdc.INVOCATION_TIMESTAMP to timestamp.toString() - ) - } - } - - companion object { - private val ENTRY = MarkerFactory.getMarker("ENTRY") - private val EXIT = MarkerFactory.getMarker("EXIT") - private val INVOKE = MarkerFactory.getMarker("INVOKE") - } -} diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt deleted file mode 100644 index 86584164..00000000 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.utils.logging - -/** - * @author Piotr Jaszczyk - * @since December 2018 - */ -object OnapMdc { - const val REQUEST_ID = "RequestID" - const val CLIENT_NAME = "PartnerName" - const val CLIENT_IP = "ClientIPAddress" - const val INVOCATION_ID = "InvocationID" - const val INVOCATION_TIMESTAMP = "InvokeTimestamp" - const val STATUS_CODE = "StatusCode" - const val INSTANCE_ID = "InstanceID" - const val SERVER_FQDN = "ServerFQDN" -} diff --git a/sources/pom.xml b/sources/pom.xml index 7e877438..6f75ec7f 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -143,6 +143,7 @@ hv-collector-domain hv-collector-health-check hv-collector-main + hv-collector-server hv-collector-ssl hv-collector-test-utils hv-collector-utils -- cgit 1.2.3-korg