aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 15:58:56 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-20 14:57:25 +0100
commit4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch)
treecff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-core/src/test
parent4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff)
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt7
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt64
2 files changed, 65 insertions, 6 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9ce0c3db..a92d3763 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
val route1 = it.routing.routes[0]
assertThat(FAULT.domainName)
.describedAs("routed domain 1")
@@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String,
)
}
-
-const val kafkaAddress = "message-router-kafka"
-
fun constructConsulResponse(): String =
"""{
- "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "whatever": "garbage",
"collector.routing": [
{
"fromDomain": "fault",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
new file mode 100644
index 00000000..3a924e48
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import arrow.syntax.collections.tail
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object KafkaSinkProviderTest : Spek({
+ describe("non functional requirements") {
+ given("sample configuration") {
+ val config = KafkaConfiguration("localhost:9090")
+ val cut = KafkaSinkProvider(config)
+
+ on("sample clients") {
+ val clients = listOf(
+ ClientContext(),
+ ClientContext(),
+ ClientContext(),
+ ClientContext())
+
+ it("should create only one instance of KafkaSender") {
+ val sinks = clients.map(cut::invoke)
+ val firstSink = sinks[0]
+ val restOfSinks = sinks.tail()
+
+ assertThat(restOfSinks).isNotEmpty
+ assertThat(restOfSinks).allSatisfy { sink ->
+ assertThat(firstSink.usesSameSenderAs(sink))
+ .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
+ .isTrue()
+ }
+ }
+ }
+ }
+ }
+})