diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 11:43:18 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2019-01-22 14:30:32 +0100 |
commit | d7532776b9d608632b91a6c658fcd72ca7c70d64 (patch) | |
tree | 0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-ct/src | |
parent | 4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff) |
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages.
Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc
Issue-ID: DCAEGEN2-1065
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
3 files changed, 30 insertions, 3 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c3e4a581..30661e84 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.core.getOrElse +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.* import reactor.core.publisher.Flux import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) { +class Sut(sink: Sink = StoringSink()): AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() + val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( configurationProvider, - SinkProvider.just(sink), + sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, healthStateProvider) @@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) { throw IllegalStateException("Collector not available.") } + override fun close() { + collectorProvider.close().unsafeRunSync() + } + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } +class DummySinkProvider(private val sink: Sink) : SinkProvider { + private val active = AtomicBoolean(true) + + override fun invoke(ctx: ClientContext) = sink + + override fun close() = IO { + active.set(false) + } + + val closed get() = !active.get() + +} + private val timeout = Duration.ofSeconds(10) fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 75e7cf0e..ed46b119 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -61,6 +61,14 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } + + it("should close sink when closing collector provider") { + val (sut, _) = vesHvWithStoringSink() + + sut.close() + + assertThat(sut.sinkProvider.closed).isTrue() + } } describe("Memory management") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index b4ce6499..2e7065b2 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes -import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage |