aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt75
1 files changed, 37 insertions, 38 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 7a47cfc3..e535300a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,9 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.Option
+import arrow.core.getOrElse
import arrow.effects.IO
-import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
-import reactor.ipc.netty.ByteBufFlux
-import reactor.ipc.netty.NettyInbound
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.TcpServer
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
import java.time.Duration
-import java.util.function.BiFunction
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val collectorProvider: CollectorProvider) : Server {
override fun start(): IO<ServerHandle> = IO {
- val ctx = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
- handleConnection(input)
- })
- NettyServerHandle(ctx)
- }
+ val tcpServer = TcpServer.create()
+ .addressSupplier { serverConfig.serverListenAddress }
+ .configureSsl()
+ .handle(this::handleConnection)
- private fun configureServer(opts: ServerOptions.Builder<*>) {
- val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration)
- if (sslContext.isDefined()) opts.sslContext(sslContext.orNull())
- opts.port(serverConfig.listenPort)
+ NettyServerHandle(tcpServer.bindNow())
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ private fun TcpServer.configureSsl() =
+ sslContextFactory
+ .createSslContext(serverConfig.securityConfiguration)
+ .map { sslContext ->
+ this.secure { b -> b.sslContext(sslContext) }
+ }.getOrElse { this }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
{
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ nettyInbound.withConnection { conn ->
+ logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+ }
Mono.empty()
},
{
- logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- val allocator = nettyInbound.context().channel().alloc()
- it.handleConnection(allocator, createDataStream(nettyInbound))
+ nettyInbound.withConnection { conn ->
+ logger.info { "Handling connection from ${conn.address()}" }
+ conn.configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ }
+ it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
}
)
-
- fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
+ private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
disconnectClient()
}
return this
}
- private fun NettyInbound.disconnectClient() {
- context().channel().close().addListener {
+ private fun Connection.disconnectClient() {
+ channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ logger.debug { "Channel (${address()}) closed successfully." }
else
logger.warn("Channel close failed", it.cause())
}
}
- private fun NettyInbound.logConnectionClosed(): NettyInbound {
- context().onClose {
- logger.info("Connection from ${remoteAddress()} has been closed")
+ private fun Connection.logConnectionClosed(): Connection {
+ onTerminate().subscribe {
+ logger.info("Connection from ${address()} has been closed")
}
return this
}