aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt24
1 files changed, 22 insertions, 2 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index c3e4a581..30661e84 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.tests.component
import arrow.core.getOrElse
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.*
import reactor.core.publisher.Flux
import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) {
+class Sut(sink: Sink = StoringSink()): AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
+ val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
configurationProvider,
- SinkProvider.just(sink),
+ sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
healthStateProvider)
@@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) {
throw IllegalStateException("Collector not available.")
}
+ override fun close() {
+ collectorProvider.close().unsafeRunSync()
+ }
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
+class DummySinkProvider(private val sink: Sink) : SinkProvider {
+ private val active = AtomicBoolean(true)
+
+ override fun invoke(ctx: ClientContext) = sink
+
+ override fun close() = IO {
+ active.set(false)
+ }
+
+ val closed get() = !active.get()
+
+}
+
private val timeout = Duration.ofSeconds(10)
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {