From d7532776b9d608632b91a6c658fcd72ca7c70d64 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 22 Jan 2019 11:43:18 +0100 Subject: Close KafkaSender when handling SIGINT Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk --- .../dcae/collectors/veshv/tests/component/Sut.kt | 24 ++++++++++++++++++++-- .../veshv/tests/component/VesHvSpecification.kt | 8 ++++++++ .../onap/dcae/collectors/veshv/tests/fakes/sink.kt | 1 - 3 files changed, 30 insertions(+), 3 deletions(-) (limited to 'sources/hv-collector-ct') diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c3e4a581..30661e84 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.tests.component import arrow.core.getOrElse +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.UnpooledByteBufAllocator @@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.* import reactor.core.publisher.Flux import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean /** * @author Piotr Jaszczyk * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) { +class Sut(sink: Sink = StoringSink()): AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() + val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( configurationProvider, - SinkProvider.just(sink), + sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, healthStateProvider) @@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) { throw IllegalStateException("Collector not available.") } + override fun close() { + collectorProvider.close().unsafeRunSync() + } + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } +class DummySinkProvider(private val sink: Sink) : SinkProvider { + private val active = AtomicBoolean(true) + + override fun invoke(ctx: ClientContext) = sink + + override fun close() = IO { + active.set(false) + } + + val closed get() = !active.get() + +} + private val timeout = Duration.ofSeconds(10) fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 75e7cf0e..ed46b119 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -61,6 +61,14 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } + + it("should close sink when closing collector provider") { + val (sut, _) = vesHvWithStoringSink() + + sut.close() + + assertThat(sut.sinkProvider.closed).isTrue() + } } describe("Memory management") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index b4ce6499..2e7065b2 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes -import arrow.core.identity import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage -- cgit 1.2.3-korg