summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-09-05 10:37:51 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2018-09-06 07:59:25 +0000
commita40ef852a33e1fb335b6bf8b36515a0ef7546f2c (patch)
treec485908389db15bdf50aabf3f4576eece0a2df89
parentb20f963b6bafa0a0a5acfac3f2c802e5539b5068 (diff)
Handle non-existing Collector instance
Change-Id: I0b6cd5023b2bca0f0bee6958c107fc560fc95b52 Issue-ID: DCAEGEN2-751 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt27
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
4 files changed, 25 insertions, 11 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 6c256b72..3c85a9b1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
@@ -30,7 +31,7 @@ interface Collector {
fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Collector
+typealias CollectorProvider = () -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index a400ff32..d807a9e7 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
- return collector::get
+ return collector::getOption
}
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index f858d959..a34be7cd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.receive()
.retain()
- return collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ return collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
+
}
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
- logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
- context().channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Client disconnected because of idle timeout" }
- else
- logger.warn("Channel close failed", it.cause())
+ logger.info {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
}
+ disconnectClient()
}
return this
}
+ private fun NettyInbound.disconnectClient() {
+ context().channel().close().addListener {
+ if (it.isSuccess)
+ logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+
private fun NettyInbound.logConnectionClosed(): NettyInbound {
context().onClose {
logger.info("Connection from ${remoteAddress()} has been closed")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e9b70578..942e6edf 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.getOrElse
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider()
+ get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {