diff options
4 files changed, 25 insertions, 11 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 6c256b72..3c85a9b1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import arrow.core.Option import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -30,7 +31,7 @@ interface Collector { fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> } -typealias CollectorProvider = () -> Collector +typealias CollectorProvider = () -> Option<Collector> interface Server { fun start(): IO<ServerHandle> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index a400ff32..d807a9e7 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.concurrent.atomic.AtomicReference @@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) - return collector::get + return collector::getOption } private fun createVesHvCollector(config: CollectorConfiguration): Collector { diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index f858d959..a34be7cd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, .receive() .retain() - return collectorProvider() - .handleConnection(nettyInbound.context().channel().alloc(), dataStream) + return collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) + } private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { - logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } - context().channel().close().addListener { - if (it.isSuccess) - logger.debug { "Client disconnected because of idle timeout" } - else - logger.warn("Channel close failed", it.cause()) + logger.info { + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." } + disconnectClient() } return this } + private fun NettyInbound.disconnectClient() { + context().channel().close().addListener { + if (it.isSuccess) + logger.debug { "Channel (${remoteAddress()}) closed successfully." } + else + logger.warn("Channel close failed", it.cause()) + } + } + private fun NettyInbound.logConnectionClosed(): NettyInbound { context().onClose { logger.info("Connection from ${remoteAddress()} has been closed") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e9b70578..942e6edf 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.getOrElse import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) { private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider() + get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { |