diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-02 15:40:46 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-04-03 08:51:03 +0200 |
commit | 302d27926c76bb99eecc4f74d333d0e8ff240c6e (patch) | |
tree | c9b716c649deb8b14d9ace320b3f35ed22604d0e /sources/hv-collector-ct | |
parent | 6a00e38550fd1745c3377da2099bf5a615f69053 (diff) |
Fix shutting down when new config received bug
When new configuration has been received and at least one client
connection has been active the collector used to shut down.
Also got rid of some more IO monad usage.
Change-Id: I7981ff388ff1264a79d722727ef3005cf39e9f0d
Issue-ID: DCAEGEN2-1382
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
3 files changed, 8 insertions, 9 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index f79c2e46..95b9159e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -40,6 +40,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -93,7 +94,7 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { if (sinkInitialized.get()) { sink.close() } else { - IO.unit + Mono.empty() } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 2430c74f..d845f7c4 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -67,7 +67,7 @@ object VesHvSpecification : Spek({ // just connecting should not create sink sut.handleConnection() - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isFalse() @@ -80,7 +80,7 @@ object VesHvSpecification : Spek({ sut.handleConnection(vesWireFrameMessage(PERF3GPP)) // when - sut.close().unsafeRunSync() + sut.close().block() // then assertThat(sink.closed).isTrue() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 160defdb..f1b1ba2d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque @@ -50,12 +51,9 @@ class StoringSink : Sink { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } - /* - * TOD0: if the code would look like: - * ```IO { active.set(false) }``` - * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) - */ - override fun close() = active.set(false).run { IO.unit } + override fun close(): Mono<Void> = Mono.fromRunnable { + active.set(false) + } } /** |