From 4128aa2c9368ed20fab92e8c0df83f14d6233b86 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 18 Dec 2018 15:58:56 +0100 Subject: There should be one KafkaSender per configuration We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk --- .../adapters/ConsulConfigurationProviderTest.kt | 7 +-- .../impl/adapters/kafka/KafkaSinkProviderTest.kt | 64 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 6 deletions(-) create mode 100644 sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt (limited to 'sources/hv-collector-core/src/test/kotlin/org') diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 9ce0c3db..a92d3763 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -66,8 +66,6 @@ internal object ConsulConfigurationProviderTest : Spek({ StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { - assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) - val route1 = it.routing.routes[0] assertThat(FAULT.domainName) .describedAs("routed domain 1") @@ -139,12 +137,9 @@ private fun constructConsulConfigProvider(url: String, ) } - -const val kafkaAddress = "message-router-kafka" - fun constructConsulResponse(): String = """{ - "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", + "whatever": "garbage", "collector.routing": [ { "fromDomain": "fault", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt new file mode 100644 index 00000000..3a924e48 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import arrow.syntax.collections.tail +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.model.KafkaConfiguration + +/** + * @author Piotr Jaszczyk + * @since December 2018 + */ +internal object KafkaSinkProviderTest : Spek({ + describe("non functional requirements") { + given("sample configuration") { + val config = KafkaConfiguration("localhost:9090") + val cut = KafkaSinkProvider(config) + + on("sample clients") { + val clients = listOf( + ClientContext(), + ClientContext(), + ClientContext(), + ClientContext()) + + it("should create only one instance of KafkaSender") { + val sinks = clients.map(cut::invoke) + val firstSink = sinks[0] + val restOfSinks = sinks.tail() + + assertThat(restOfSinks).isNotEmpty + assertThat(restOfSinks).allSatisfy { sink -> + assertThat(firstSink.usesSameSenderAs(sink)) + .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") + .isTrue() + } + } + } + } + } +}) -- cgit 1.2.3-korg