diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-26 14:21:02 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-28 14:16:02 +0100 |
commit | 2174a045086e16611128b20a6d4357c04d9eac4a (patch) | |
tree | 6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-core/src/test/kotlin/org/onap | |
parent | 1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff) |
Redefine Routing
As all needed information to route messege is contained inside of
KafkaSink message, we can simply put this object as part of single Route.
Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap')
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt | 123 | ||||
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt) | 22 | ||||
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt | 88 |
3 files changed, 93 insertions, 140 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index b8b55865..6b9c6803 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -20,22 +20,30 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None -import arrow.core.Some +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import reactor.core.publisher.Flux +import reactor.test.StepVerifier /** @@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame * @since May 2018 */ object RouterTest : Spek({ - given("sample configuration") { - val config = Routing(listOf( - Route(PERF3GPP.domainName, "ves_rtpm", { 2 }), - Route(SYSLOG.domainName, "ves_trace") - )) - val cut = Router(config, ClientContext()) - - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } + describe("Router") { - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } + whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } + val messageSinkMap = mapOf( + Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(syslogTopic, lazyOf(messageSinkMock)) + ) - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + given("sample routing specification") { + val cut = router(defaultRouting, messageSinkMap) - it("should have route available") { - assertThat(result).isNotNull() - } + on("message with existing route (rtpm)") { + whenever(messageSinkMock.send(routedPerf3GppMessage)) + .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage)) - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } + it("should be properly routed") { + val result = cut.route(perf3gppMessage) - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) + assertThat(result).isNotNull() + StepVerifier.create(result) + .expectNext(successfullyConsumedPerf3gppMessage) + .verifyComplete() + + verify(perf3gppSinkMock).topicName() + verify(messageSinkMock).send(routedPerf3GppMessage) + } } - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + on("message with existing route (syslog)") { + whenever(messageSinkMock.send(routedSyslogMessage)) + .thenReturn(Flux.just(successfullyConsumedSyslogMessage)) + val result = cut.route(syslogMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedSyslogMessage) + .verifyComplete() + + verify(syslogSinkMock).topicName() + verify(messageSinkMock).send(routedSyslogMessage) + } } - } - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) - val result = cut.findDestination(message) + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val result = cut.route(message) - it("should not have route available") { - assertThat(result).isEqualTo(None) + it("should not have route available") { + StepVerifier.create(result).verifyComplete() + } } } } -})
\ No newline at end of file + +}) + +private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = + Router(routing, kafkaPublisherMap, ClientContext(), mock()) + +private val perf3gppTopic = "PERF_PERF" +private val perf3gppSinkMock = mock<KafkaSink>() +private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) + +private val syslogTopic = "SYS_LOG" +private val syslogSinkMock = mock<KafkaSink>() +private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) + +private val messageSinkMock = mock<Sink>() +private val default_partition = None + +private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) +private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) + +private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) +private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt index 571a6680..8616ce03 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt @@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers import reactor.core.publisher.Flux + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({ .expectNoEvent(waitTime) } } - } + given("valid configuration from cbs") { val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider) @@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({ StepVerifier.create(configProvider().take(1)) .consumeNextWith { - val receivedSink1 = it.elementAt(0) - val receivedSink2 = it.elementAt(1) + val route1 = it.elementAt(0) + val route2 = it.elementAt(1) + val receivedSink1 = route1.sink + val receivedSink2 = route2.sink + assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL) assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1) assertThat(receivedSink1.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060") assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP") + assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL) assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2) assertThat(receivedSink2.bootstrapServers()) .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060") assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP") + }.verifyComplete() } } @@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({ }) + +val PERF3GPP_REGIONAL = "perf3gpp_regional" +val PERF3GPP_CENTRAL = "perf3gpp_central" + private val aafCredentials1 = ImmutableAafCredentials.builder() .username("client") .password("very secure password") @@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder() private val validConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", @@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse(""" "topic_name": "REG_HVVES_PERF3GPP" } }, - "perf3gpp_central": { + "$PERF3GPP_CENTRAL": { "type": "kafka", "aaf_credentials": { "username": "other_client", @@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse(""" private val invalidConfiguration = JsonParser().parse(""" { "streams_publishes": { - "perf3gpp_regional": { + "$PERF3GPP_REGIONAL": { "type": "kafka", "aaf_credentials": { "username": "client", diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt deleted file mode 100644 index eb0a3173..00000000 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import arrow.syntax.collections.tail -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.ves.VesEventOuterClass -import reactor.kafka.sender.KafkaSender - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since December 2018 - */ -internal object KafkaSinkProviderTest : Spek({ - describe("non functional requirements") { - given("sample configuration") { - val config = CollectorConfiguration( - maxRequestSizeBytes = 1024 * 1024, - kafkaServers = "localhost:9090", - routing = Routing(emptyList()) - ) - - val cut = KafkaSinkProvider(config) - - on("sample clients") { - val clients = listOf( - ClientContext(), - ClientContext(), - ClientContext(), - ClientContext()) - - it("should create only one instance of KafkaSender") { - val sinks = clients.map(cut::invoke) - val firstSink = sinks[0] - val restOfSinks = sinks.tail() - - assertThat(restOfSinks).isNotEmpty - assertThat(restOfSinks).allSatisfy { sink -> - assertThat(firstSink.usesSameSenderAs(sink)) - .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender") - .isTrue() - } - } - } - } - - given("dummy KafkaSender") { - val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock() - val cut = KafkaSinkProvider(kafkaSender) - - on("close") { - cut.close().unsafeRunSync() - - it("should close KafkaSender") { - verify(kafkaSender).close() - } - } - } - } -}) |