aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-02 15:40:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-03 08:51:03 +0200
commit302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch)
treec9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-ct/src
parent6a00e38550fd1745c3377da2099bf5a615f69053 (diff)
Fix shutting down when new config received bug
When new configuration has been received and at least one client connection has been active the collector used to shut down. Also got rid of some more IO monad usage. Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d Issue-ID: DCAEGEN2-1382 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
3 files changed, 8 insertions, 9 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index f79c2e46..95b9159e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
if (sinkInitialized.get()) {
sink.close()
} else {
- IO.unit
+ Mono.empty()
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 2430c74f..d845f7c4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -67,7 +67,7 @@ object VesHvSpecification : Spek({
// just connecting should not create sink
sut.handleConnection()
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isFalse()
@@ -80,7 +80,7 @@ object VesHvSpecification : Spek({
sut.handleConnection(vesWireFrameMessage(PERF3GPP))
// when
- sut.close().unsafeRunSync()
+ sut.close().block()
// then
assertThat(sink.closed).isTrue()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 160defdb..f1b1ba2d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
@@ -50,12 +51,9 @@ class StoringSink : Sink {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
- /*
- * TOD0: if the code would look like:
- * ```IO { active.set(false) }```
- * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
- */
- override fun close() = active.set(false).run { IO.unit }
+ override fun close(): Mono<Void> = Mono.fromRunnable {
+ active.set(false)
+ }
}
/**