summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 11:43:18 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 14:30:32 +0100
commitd7532776b9d608632b91a6c658fcd72ca7c70d64 (patch)
tree0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-ct
parent4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff)
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt24
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt8
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt1
3 files changed, 30 insertions, 3 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index c3e4a581..30661e84 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.tests.component
import arrow.core.getOrElse
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -33,20 +34,22 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.*
import reactor.core.publisher.Flux
import java.time.Duration
+import java.util.concurrent.atomic.AtomicBoolean
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) {
+class Sut(sink: Sink = StoringSink()): AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
+ val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
configurationProvider,
- SinkProvider.just(sink),
+ sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
healthStateProvider)
@@ -57,11 +60,28 @@ class Sut(sink: Sink = StoringSink()) {
throw IllegalStateException("Collector not available.")
}
+ override fun close() {
+ collectorProvider.close().unsafeRunSync()
+ }
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
+class DummySinkProvider(private val sink: Sink) : SinkProvider {
+ private val active = AtomicBoolean(true)
+
+ override fun invoke(ctx: ClientContext) = sink
+
+ override fun close() = IO {
+ active.set(false)
+ }
+
+ val closed get() = !active.get()
+
+}
+
private val timeout = Duration.ofSeconds(10)
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 75e7cf0e..ed46b119 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -61,6 +61,14 @@ object VesHvSpecification : Spek({
.describedAs("should send all events")
.hasSize(2)
}
+
+ it("should close sink when closing collector provider") {
+ val (sut, _) = vesHvWithStoringSink()
+
+ sut.close()
+
+ assertThat(sut.sinkProvider.closed).isTrue()
+ }
}
describe("Memory management") {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index b4ce6499..2e7065b2 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
-import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage