aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 11:43:18 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-01-22 14:30:32 +0100
commitd7532776b9d608632b91a6c658fcd72ca7c70d64 (patch)
tree0d90d7a75a4a1d83dd1cbd7c5af43e71bb6fea6c /sources/hv-collector-core/src/test
parent4c529a33439cc40bf192ea3f8dac57d189d60b9f (diff)
Close KafkaSender when handling SIGINT
Closing KafkaSender should result in flushing any pending messages. Change-Id: Ib251f5ca3527266831189df542784cc17173d8dc Issue-ID: DCAEGEN2-1065 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt18
1 files changed, 18 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
index f23154a4..2db6a152 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -20,6 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import arrow.syntax.collections.tail
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -28,6 +30,9 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass
+import reactor.kafka.sender.KafkaSender
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -61,5 +66,18 @@ internal object KafkaSinkProviderTest : Spek({
}
}
}
+
+ given("dummy KafkaSender") {
+ val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
+ val cut = KafkaSinkProvider(kafkaSender)
+
+ on("close") {
+ cut.close().unsafeRunSync()
+
+ it("should close KafkaSender") {
+ verify(kafkaSender).close()
+ }
+ }
+ }
}
})