aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-server/src/main')
-rw-r--r--sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt41
-rw-r--r--sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt67
-rw-r--r--sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt163
-rw-r--r--sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt81
4 files changed, 352 insertions, 0 deletions
diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt
new file mode 100644
index 00000000..2bfac8d8
--- /dev/null
+++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.api
+
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.impl.HvVesServer
+import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
+
+interface Server {
+ fun start(): Mono<ServerHandle>
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ServersFactory {
+ fun createHvVesServer(config: HvVesConfiguration,
+ sslContextFactory: SslContextFactory,
+ metrics: Metrics): Server = HvVesServer(config, sslContextFactory, metrics)
+}
diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt
new file mode 100644
index 00000000..0e149ab7
--- /dev/null
+++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.api.Server
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
+import org.onap.dcae.collectors.veshv.factory.AdapterFactory
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class HvVesServer(private val config: HvVesConfiguration,
+ private val sslFactory: SslContextFactory,
+ private val metrics: Metrics) : Server {
+
+ private val logger = Logger(HvVesServer::class)
+
+ override fun start(): Mono<ServerHandle> =
+ createNettyTcpServer(config)
+ .start()
+ .doOnNext(::logServerStarted)
+
+ private fun createNettyTcpServer(config: HvVesConfiguration): Server =
+ NettyTcpServer(
+ config.server,
+ sslFactory.createServerContext(config.security),
+ createCollectorProvider(config),
+ metrics
+ )
+
+ private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory =
+ HvVesCollectorFactory(
+ config.collector,
+ AdapterFactory.sinkCreatorFactory(),
+ metrics
+ )
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info(ServiceContext::mdc) {
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+ }
+}
diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
new file mode 100644
index 00000000..d19b7f49
--- /dev/null
+++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
@@ -0,0 +1,163 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import arrow.core.getOrElse
+import io.netty.handler.ssl.SslContext
+import org.onap.dcae.collectors.veshv.api.Server
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.domain.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.debug
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.info
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.info
+import reactor.core.publisher.Mono
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.time.Duration
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class NettyTcpServer(private val serverConfiguration: ServerConfiguration,
+ private val sslContext: Option<SslContext>,
+ private val collectorFactory: CollectorFactory,
+ private val metrics: Metrics) : Server {
+
+ override fun start(): Mono<ServerHandle> =
+ Mono.defer {
+ TcpServer.create()
+ .addressSupplier { InetSocketAddress(serverConfiguration.listenPort) }
+ .configureSsl()
+ .handle(this::handleConnection)
+ .bind()
+ .map {
+ NettyServerHandle(it, closeAction())
+ }
+ }
+
+ private fun closeAction(): Mono<Void> =
+ collectorFactory.close().doOnSuccess {
+ logger.info(ServiceContext::mdc) { "Netty TCP Server closed" }
+ }
+
+
+ private fun TcpServer.configureSsl() =
+ sslContext
+ .map { serverContext ->
+ logger.info { "Collector configured with SSL enabled" }
+ this.secure { it.sslContext(serverContext) }
+ }.getOrElse {
+ logger.info { "Collector configured with SSL disabled" }
+ this
+ }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ messageHandlingStream(nettyInbound, nettyOutbound).run {
+ subscribe()
+ nettyOutbound.neverComplete()
+ }
+
+ private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ withNewClientContextFrom(nettyInbound, nettyOutbound)
+ { clientContext ->
+ logger.debug(clientContext::fullMdc) { "Client connection request received" }
+
+ clientContext.clientAddress
+ .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+ .getOrElse {
+ logger.warn(clientContext::fullMdc) {
+ "Client address could not be resolved. Discarding connection"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty())
+ }
+ }
+
+ private fun acceptIfNotLocalConnection(address: InetAddress,
+ clientContext: ClientContext,
+ nettyInbound: NettyInbound): Mono<Void> =
+ if (address.isLocalClientAddress()) {
+ logger.debug(clientContext) {
+ "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+ } else {
+ acceptClientConnection(clientContext, nettyInbound)
+ }
+
+ private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+ metrics.notifyClientConnected()
+ logger.info(clientContext::fullMdc, Marker.Entry) { "Handling new client connection" }
+ val collector = collectorFactory(clientContext)
+ return collector.handleClient(clientContext, nettyInbound)
+ }
+
+ private fun Collector.handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ connection
+ .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ handleConnection(nettyInbound.createDataStream())
+ }
+
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
+ onReadIdle(timeout.toMillis()) {
+ logger.info(ctx) {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ }
+ disconnectClient(ctx)
+ }
+
+ private fun Connection.disconnectClient(ctx: ClientContext) =
+ closeChannelAndThen {
+ if (it.isSuccess)
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+ else
+ logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+ }
+
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
+ onDispose {
+ metrics.notifyClientDisconnected()
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
+ }
+
+ companion object {
+ private val logger = Logger(NettyTcpServer::class)
+ }
+}
diff --git a/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt
new file mode 100644
index 00000000..eb51cf4b
--- /dev/null
+++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import arrow.core.Try
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
+import io.netty.util.concurrent.Future
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import reactor.core.publisher.Mono
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import java.net.InetAddress
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
+internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
+
+internal fun Connection.getSslSession(): Option<SSLSession> =
+ Option.fromNullable(
+ channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session
+ )
+
+internal fun SSLSession.findClientCert(): Option<X509Certificate> =
+ peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
+internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
+ nettyInboud.withConnection(task)
+
+internal fun Connection.closeChannel() = channel().close()
+
+internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
+ closeChannel().addListener { task(it) }
+
+internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
+ withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
+
+internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
+
+//
+// ClientContext related
+//
+
+internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
+ nettyOutbound: NettyOutbound,
+ reactiveTask: (ClientContext) -> Mono<Void>) =
+ ClientContext(nettyOutbound.alloc())
+ .also { populateClientContextFromInbound(it, nettyInbound) }
+ .run(reactiveTask)
+
+internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
+ withConnectionFrom(nettyInbound) { connection ->
+ clientContext.clientAddress = Try { connection.address().address }.toOption()
+ clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
+ }