diff options
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt')
-rw-r--r-- | sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt | 24 |
1 files changed, 22 insertions, 2 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c3e4a581..30661e84 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.core.getOrElse +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.* import reactor.core.publisher.Flux import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) { +class Sut(sink: Sink = StoringSink()): AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() + val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( configurationProvider, - SinkProvider.just(sink), + sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, healthStateProvider) @@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) { throw IllegalStateException("Collector not available.") } + override fun close() { + collectorProvider.close().unsafeRunSync() + } + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } +class DummySinkProvider(private val sink: Sink) : SinkProvider { + private val active = AtomicBoolean(true) + + override fun invoke(ctx: ClientContext) = sink + + override fun close() = IO { + active.set(false) + } + + val closed get() = !active.get() + +} + private val timeout = Duration.ofSeconds(10) fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { |