diff options
Diffstat (limited to 'hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl')
4 files changed, 51 insertions, 47 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 07b5c82e..78afe9fd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d08ad9e9..af4bbaa1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, } companion object { - private const val MAX_RETRIES = 5 + private const val MAX_RETRIES = 5L private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 4503955f..1672158e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -19,10 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import io.netty.handler.codec.http.HttpStatusClass import org.slf4j.LoggerFactory import reactor.core.publisher.Mono -import reactor.core.publisher.toMono -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient +import java.lang.IllegalStateException import java.nio.charset.Charset /** @@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get(url + createQueryString(queryParams)) + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else + Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}")) + } .doOnError { logger.error("Failed to get resource on path: $url (${it.localizedMessage})") logger.debug("Nested exception:", it) } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } - private fun createQueryString(params: Map<String, Any>): String { if (params.isEmpty()) @@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } - return builder.removeSuffix("&").toString() + return builder.removeSuffix("&").toString() } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 7a47cfc3..e535300a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.Option +import arrow.core.getOrElse import arrow.effects.IO -import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Mono -import reactor.ipc.netty.ByteBufFlux -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.TcpServer +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpServer import java.time.Duration -import java.util.function.BiFunction /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val collectorProvider: CollectorProvider) : Server { override fun start(): IO<ServerHandle> = IO { - val ctx = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ -> - handleConnection(input) - }) - NettyServerHandle(ctx) - } + val tcpServer = TcpServer.create() + .addressSupplier { serverConfig.serverListenAddress } + .configureSsl() + .handle(this::handleConnection) - private fun configureServer(opts: ServerOptions.Builder<*>) { - val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration) - if (sslContext.isDefined()) opts.sslContext(sslContext.orNull()) - opts.port(serverConfig.listenPort) + NettyServerHandle(tcpServer.bindNow()) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + private fun TcpServer.configureSsl() = + sslContextFactory + .createSslContext(serverConfig.securityConfiguration) + .map { sslContext -> + this.secure { b -> b.sslContext(sslContext) } + }.getOrElse { this } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + nettyInbound.withConnection { conn -> + logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } + } Mono.empty() }, { - logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - val allocator = nettyInbound.context().channel().alloc() - it.handleConnection(allocator, createDataStream(nettyInbound)) + nettyInbound.withConnection { conn -> + logger.info { "Handling connection from ${conn.address()}" } + conn.configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + } + it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) } ) - - fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + private fun Connection.configureIdleTimeout(timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } disconnectClient() } return this } - private fun NettyInbound.disconnectClient() { - context().channel().close().addListener { + private fun Connection.disconnectClient() { + channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${remoteAddress()}) closed successfully." } + logger.debug { "Channel (${address()}) closed successfully." } else logger.warn("Channel close failed", it.cause()) } } - private fun NettyInbound.logConnectionClosed(): NettyInbound { - context().onClose { - logger.info("Connection from ${remoteAddress()} has been closed") + private fun Connection.logConnectionClosed(): Connection { + onTerminate().subscribe { + logger.info("Connection from ${address()} has been closed") } return this } |