aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-04-03 15:48:28 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-04-12 09:56:50 +0200
commit49f43c856c8ca793bc6972d9d4b47c2d0d4c0816 (patch)
tree1e2c7d124898e053d5a3d42f9dcb329d24050d0e /sources/hv-collector-core/src/main
parent8b8385d323754903ade492a659548d54b56bd7ad (diff)
Creation of server module
Issue-ID: DCAEGEN2-1390 Change-Id: I07410b16ed6566b933d5f1efa35bddb965225794 Signed-off-by: kjaniak <kornel.janiak@nokia.com> Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt48
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt47
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt160
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt81
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt61
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt45
15 files changed, 17 insertions, 461 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 48f335a1..28b28203 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 4c54d7d2..23a5d376 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -20,9 +20,8 @@
package org.onap.dcae.collectors.veshv.boundary
import io.netty.buffer.ByteBuf
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -34,6 +33,3 @@ interface CollectorFactory : Closeable {
operator fun invoke(ctx: ClientContext): Collector
}
-interface Server {
- fun start(): Mono<ServerHandle>
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
index c3c5d733..1f221c60 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
@@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.impl.HvVesCollector
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
deleted file mode 100644
index e0f611b6..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.factory
-
-import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object ServerFactory {
-
- private val sslFactory = SslContextFactory()
-
- fun createNettyTcpServer(serverConfig: ServerConfiguration,
- securityConfig: SecurityConfiguration,
- collectorFactory: CollectorFactory,
- metrics: Metrics
- ): Server = NettyTcpServer(
- serverConfig,
- sslFactory.createServerContext(securityConfig),
- collectorFactory,
- metrics
- )
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index 7d8f0cb1..ac7c3917 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -23,9 +23,9 @@ import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index fec713ad..2190eba3 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -26,9 +26,9 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
deleted file mode 100644
index 954de978..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
-import reactor.core.publisher.Flux
-
-@Suppress("TooManyFunctions")
-internal object ClientContextLogging {
- fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
- fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
- fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
- fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::fullMdc, block)
- fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::fullMdc, block)
-
- fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::fullMdc, message)
- fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::fullMdc, message)
- fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::fullMdc, message)
- fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::fullMdc, message)
- fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::fullMdc, message)
-
- fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
- returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- return this.handleReactiveStreamError({ context.fullMdc }, ex, returnFlux)
- }
-}
-
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 8d154091..8f66de2b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import org.onap.dcae.collectors.veshv.domain.logging.OnapMdc
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import java.util.*
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 7b726ab4..91e6fde5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -22,14 +22,16 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.withDebug
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.trace
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
index 9df1af31..2973fa8d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt
@@ -22,8 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
deleted file mode 100644
index 7ce86f98..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.socket
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Mono
-import reactor.netty.Connection
-import reactor.netty.NettyInbound
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpServer
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.time.Duration
-
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
- private val sslContext: Option<SslContext>,
- private val collectorFactory: CollectorFactory,
- private val metrics: Metrics) : Server {
-
- override fun start(): Mono<ServerHandle> =
- Mono.defer {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .bind()
- .map {
- NettyServerHandle(it, closeAction())
- }
- }
-
- private fun closeAction(): Mono<Void> =
- collectorFactory.close().doOnSuccess {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- }
-
-
- private fun TcpServer.configureSsl() =
- sslContext
- .map { serverContext ->
- logger.info { "Collector configured with SSL enabled" }
- this.secure { it.sslContext(serverContext) }
- }.getOrElse {
- logger.info { "Collector configured with SSL disabled" }
- this
- }
-
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- messageHandlingStream(nettyInbound, nettyOutbound).run {
- subscribe()
- nettyOutbound.neverComplete()
- }
-
- private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- withNewClientContextFrom(nettyInbound, nettyOutbound)
- { clientContext ->
- logger.debug(clientContext::fullMdc) { "Client connection request received" }
-
- clientContext.clientAddress
- .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
- .getOrElse {
- logger.warn(clientContext::fullMdc) {
- "Client address could not be resolved. Discarding connection"
- }
- nettyInbound.closeConnectionAndReturn(Mono.empty())
- }
- }
-
- private fun acceptIfNotLocalConnection(address: InetAddress,
- clientContext: ClientContext,
- nettyInbound: NettyInbound): Mono<Void> =
- if (address.isLocalClientAddress()) {
- logger.debug(clientContext) {
- "Client address resolved to localhost. Discarding connection as suspected healthcheck"
- }
- nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
- } else {
- acceptClientConnection(clientContext, nettyInbound)
- }
-
- private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
- metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
- val collector = collectorFactory(clientContext)
- return collector.handleClient(clientContext, nettyInbound)
- }
-
- private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
-
- private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
- onReadIdle(timeout.toMillis()) {
- logger.info(ctx) {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
- }
- disconnectClient(ctx)
- }
-
- private fun Connection.disconnectClient(ctx: ClientContext) =
- closeChannelAndThen {
- if (it.isSuccess)
- logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
- else
- logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
- }
-
- private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
- onDispose {
- metrics.notifyClientDisconnected()
- logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
- }
-
- companion object {
- private val logger = Logger(NettyTcpServer::class)
- }
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
deleted file mode 100644
index a1e5b8fd..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.socket
-
-import arrow.core.Option
-import arrow.core.Try
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
-import io.netty.util.concurrent.Future
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
-import reactor.netty.Connection
-import reactor.netty.NettyInbound
-import reactor.netty.NettyOutbound
-import java.net.InetAddress
-import java.security.cert.X509Certificate
-import javax.net.ssl.SSLSession
-
-internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
-
-internal fun Connection.getSslSession(): Option<SSLSession> =
- Option.fromNullable(
- channel()
- .pipeline()
- .get(SslHandler::class.java)
- ?.engine()
- ?.session
- )
-
-internal fun SSLSession.findClientCert(): Option<X509Certificate> =
- peerCertificates
- .firstOption()
- .flatMap { Option.fromNullable(it as? X509Certificate) }
-
-internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
- nettyInboud.withConnection(task)
-
-internal fun Connection.closeChannel() = channel().close()
-
-internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
- closeChannel().addListener { task(it) }
-
-internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
- withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
-
-internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
-
-//
-// ClientContext related
-//
-
-internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
- nettyOutbound: NettyOutbound,
- reactiveTask: (ClientContext) -> Mono<Void>) =
- ClientContext(nettyOutbound.alloc())
- .also { populateClientContextFromInbound(it, nettyInbound) }
- .run(reactiveTask)
-
-internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- clientContext.clientAddress = Try { connection.address().address }.toOption()
- clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
- }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index ca9d28ae..0d0f8ea7 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -25,9 +25,9 @@ import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
deleted file mode 100644
index 7b082e64..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
-import java.net.InetAddress
-import java.security.cert.X509Certificate
-import java.util.*
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-data class ClientContext(
- val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
- var clientAddress: Option<InetAddress> = None,
- var clientCert: Option<X509Certificate> = None,
- val requestId: String = UUID.randomUUID().toString(), // Should be somehow propagated to DMAAP
- val invocationId: String = UUID.randomUUID().toString()) {
-
- val mdc: Map<String, String>
- get() = mapOf(
- OnapMdc.REQUEST_ID to requestId,
- OnapMdc.INVOCATION_ID to invocationId,
- OnapMdc.STATUS_CODE to DEFAULT_STATUS_CODE,
- OnapMdc.CLIENT_NAME to clientDn().getOrElse { DEFAULT_VALUE },
- OnapMdc.CLIENT_IP to clientIp().getOrElse { DEFAULT_VALUE }
- )
-
- val fullMdc: Map<String, String>
- get() = mdc + ServiceContext.mdc
-
- private fun clientDn(): Option<String> = clientCert.map { it.subjectX500Principal.toString() }
- private fun clientIp(): Option<String> = clientAddress.map(InetAddress::getHostAddress)
-
- companion object {
- const val DEFAULT_STATUS_CODE = "INPROGRESS"
- const val DEFAULT_VALUE = ""
- }
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
deleted file mode 100644
index a72ec034..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
-import java.net.InetAddress
-import java.net.UnknownHostException
-import java.util.*
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-object ServiceContext {
- val instanceId = UUID.randomUUID().toString()
- val serverFqdn = getHost().hostName!!
-
- val mdc = mapOf(
- OnapMdc.INSTANCE_ID to instanceId,
- OnapMdc.SERVER_FQDN to serverFqdn
- )
-
- private fun getHost() = try {
- InetAddress.getLocalHost()
- } catch (ex: UnknownHostException) {
- InetAddress.getLoopbackAddress()
- }
-}