aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
3 files changed, 8 insertions, 9 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index f79c2e46..95b9159e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
if (sinkInitialized.get()) {
sink.close()
} else {
- IO.unit
+ Mono.empty()
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2430c74f..d845f7c4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -67,7 +67,7 @@ object VesHvSpecification : Spek({
// just connecting should not create sink
sut.handleConnection()
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isFalse()
@@ -80,7 +80,7 @@ object VesHvSpecification : Spek({
sut.handleConnection(vesWireFrameMessage(PERF3GPP))
// when
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isTrue()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 160defdb..f1b1ba2d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
@@ -50,12 +51,9 @@ class StoringSink : Sink {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
- /*
- * TOD0: if the code would look like:
- * ```IO { active.set(false) }```
- * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
- */
- override fun close() = active.set(false).run { IO.unit }
+ override fun close(): Mono<Void> = Mono.fromRunnable {
+ active.set(false)
+ }
}
/**