aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt160
1 files changed, 0 insertions, 160 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
deleted file mode 100644
index 7ce86f98..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.socket
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
-import reactor.core.publisher.Mono
-import reactor.netty.Connection
-import reactor.netty.NettyInbound
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpServer
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.time.Duration
-
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
- private val sslContext: Option<SslContext>,
- private val collectorFactory: CollectorFactory,
- private val metrics: Metrics) : Server {
-
- override fun start(): Mono<ServerHandle> =
- Mono.defer {
- TcpServer.create()
- .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
- .configureSsl()
- .handle(this::handleConnection)
- .bind()
- .map {
- NettyServerHandle(it, closeAction())
- }
- }
-
- private fun closeAction(): Mono<Void> =
- collectorFactory.close().doOnSuccess {
- logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
- }
-
-
- private fun TcpServer.configureSsl() =
- sslContext
- .map { serverContext ->
- logger.info { "Collector configured with SSL enabled" }
- this.secure { it.sslContext(serverContext) }
- }.getOrElse {
- logger.info { "Collector configured with SSL disabled" }
- this
- }
-
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- messageHandlingStream(nettyInbound, nettyOutbound).run {
- subscribe()
- nettyOutbound.neverComplete()
- }
-
- private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- withNewClientContextFrom(nettyInbound, nettyOutbound)
- { clientContext ->
- logger.debug(clientContext::fullMdc) { "Client connection request received" }
-
- clientContext.clientAddress
- .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
- .getOrElse {
- logger.warn(clientContext::fullMdc) {
- "Client address could not be resolved. Discarding connection"
- }
- nettyInbound.closeConnectionAndReturn(Mono.empty())
- }
- }
-
- private fun acceptIfNotLocalConnection(address: InetAddress,
- clientContext: ClientContext,
- nettyInbound: NettyInbound): Mono<Void> =
- if (address.isLocalClientAddress()) {
- logger.debug(clientContext) {
- "Client address resolved to localhost. Discarding connection as suspected healthcheck"
- }
- nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
- } else {
- acceptClientConnection(clientContext, nettyInbound)
- }
-
- private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
- metrics.notifyClientConnected()
- logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
- val collector = collectorFactory(clientContext)
- return collector.handleClient(clientContext, nettyInbound)
- }
-
- private fun Collector.handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound) =
- withConnectionFrom(nettyInbound) { connection ->
- connection
- .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
- .logConnectionClosed(clientContext)
- }.run {
- handleConnection(nettyInbound.createDataStream())
- }
-
- private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
- onReadIdle(timeout.toMillis()) {
- logger.info(ctx) {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
- }
- disconnectClient(ctx)
- }
-
- private fun Connection.disconnectClient(ctx: ClientContext) =
- closeChannelAndThen {
- if (it.isSuccess)
- logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
- else
- logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
- }
-
- private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
- onDispose {
- metrics.notifyClientDisconnected()
- logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
- }
-
- companion object {
- private val logger = Logger(NettyTcpServer::class)
- }
-}