From 4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 7 Nov 2018 15:08:43 +0100 Subject: Update project and dependencies * Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay suggestion * Updated Reactor to BOM Californium-SR2 * Updated mockito-kotlin to 2.0.0 * Introduced some fixes to support OpenJDK 11 compilation Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58 Issue-ID: DCAEGEN2-961 Signed-off-by: Piotr Jaszczyk --- .../veshv/impl/adapters/AdapterFactory.kt | 2 +- .../impl/adapters/ConsulConfigurationProvider.kt | 2 +- .../collectors/veshv/impl/adapters/HttpAdapter.kt | 19 ++++-- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 75 +++++++++++----------- .../collectors/veshv/model/ServerConfiguration.kt | 5 +- 5 files changed, 54 insertions(+), 49 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 07b5c82e..78afe9fd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient /** * @author Piotr Jaszczyk diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d08ad9e9..af4bbaa1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, } companion object { - private const val MAX_RETRIES = 5 + private const val MAX_RETRIES = 5L private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 4503955f..1672158e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -19,10 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import io.netty.handler.codec.http.HttpStatusClass import org.slf4j.LoggerFactory import reactor.core.publisher.Mono -import reactor.core.publisher.toMono -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient +import java.lang.IllegalStateException import java.nio.charset.Charset /** @@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) open fun get(url: String, queryParams: Map = emptyMap()): Mono = httpClient - .get(url + createQueryString(queryParams)) + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else + Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}")) + } .doOnError { logger.error("Failed to get resource on path: $url (${it.localizedMessage})") logger.debug("Nested exception:", it) } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } - private fun createQueryString(params: Map): String { if (params.isEmpty()) @@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } - return builder.removeSuffix("&").toString() + return builder.removeSuffix("&").toString() } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 7a47cfc3..e535300a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.Option +import arrow.core.getOrElse import arrow.effects.IO -import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Mono -import reactor.ipc.netty.ByteBufFlux -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.TcpServer +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpServer import java.time.Duration -import java.util.function.BiFunction /** * @author Piotr Jaszczyk @@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val collectorProvider: CollectorProvider) : Server { override fun start(): IO = IO { - val ctx = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction> { input, _ -> - handleConnection(input) - }) - NettyServerHandle(ctx) - } + val tcpServer = TcpServer.create() + .addressSupplier { serverConfig.serverListenAddress } + .configureSsl() + .handle(this::handleConnection) - private fun configureServer(opts: ServerOptions.Builder<*>) { - val sslContext: Option = sslContextFactory.createSslContext(serverConfig.securityConfiguration) - if (sslContext.isDefined()) opts.sslContext(sslContext.orNull()) - opts.port(serverConfig.listenPort) + NettyServerHandle(tcpServer.bindNow()) } - private fun handleConnection(nettyInbound: NettyInbound): Mono = + private fun TcpServer.configureSsl() = + sslContextFactory + .createSslContext(serverConfig.securityConfiguration) + .map { sslContext -> + this.secure { b -> b.sslContext(sslContext) } + }.getOrElse { this } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = collectorProvider().fold( { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + nettyInbound.withConnection { conn -> + logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } + } Mono.empty() }, { - logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - val allocator = nettyInbound.context().channel().alloc() - it.handleConnection(allocator, createDataStream(nettyInbound)) + nettyInbound.withConnection { conn -> + logger.info { "Handling connection from ${conn.address()}" } + conn.configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + } + it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) } ) - - fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + private fun Connection.configureIdleTimeout(timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } disconnectClient() } return this } - private fun NettyInbound.disconnectClient() { - context().channel().close().addListener { + private fun Connection.disconnectClient() { + channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${remoteAddress()}) closed successfully." } + logger.debug { "Channel (${address()}) closed successfully." } else logger.warn("Channel close failed", it.cause()) } } - private fun NettyInbound.logConnectionClosed(): NettyInbound { - context().onClose { - logger.info("Connection from ${remoteAddress()} has been closed") + private fun Connection.logConnectionClosed(): Connection { + onTerminate().subscribe { + logger.info("Connection from ${address()} has been closed") } return this } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 7a7d9342..85117684 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress import java.time.Duration /** @@ -27,10 +28,10 @@ import java.time.Duration * @since May 2018 */ data class ServerConfiguration( - val listenPort: Int, + val serverListenAddress: InetSocketAddress, val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, - val healthCheckApiPort: Int, + val healthCheckApiListenAddress: InetSocketAddress, val maximumPayloadSizeBytes: Int, val dummyMode: Boolean = false) -- cgit 1.2.3-korg