aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 14:21:02 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-28 14:16:02 +0100
commit2174a045086e16611128b20a6d4357c04d9eac4a (patch)
tree6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-core/src/test
parent1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff)
Redefine Routing
As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt123
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt (renamed from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt)22
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt88
3 files changed, 93 insertions, 140 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index b8b55865..6b9c6803 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -20,22 +20,30 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
-import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
/**
@@ -43,62 +51,85 @@ import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
* @since May 2018
*/
object RouterTest : Spek({
- given("sample configuration") {
- val config = Routing(listOf(
- Route(PERF3GPP.domainName, "ves_rtpm", { 2 }),
- Route(SYSLOG.domainName, "ves_trace")
- ))
- val cut = Router(config, ClientContext())
-
- on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
-
- it("should have route available") {
- assertThat(result).isNotNull()
- }
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
- }
+ describe("Router") {
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
- }
+ whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+ whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
- }
- }
+ val messageSinkMap = mapOf(
+ Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+ Pair(syslogTopic, lazyOf(messageSinkMock))
+ )
- on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ given("sample routing specification") {
+ val cut = router(defaultRouting, messageSinkMap)
- it("should have route available") {
- assertThat(result).isNotNull()
- }
+ on("message with existing route (rtpm)") {
+ whenever(messageSinkMock.send(routedPerf3GppMessage))
+ .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
- }
+ it("should be properly routed") {
+ val result = cut.route(perf3gppMessage)
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
+ assertThat(result).isNotNull()
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedPerf3gppMessage)
+ .verifyComplete()
+
+ verify(perf3gppSinkMock).topicName()
+ verify(messageSinkMock).send(routedPerf3GppMessage)
+ }
}
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ on("message with existing route (syslog)") {
+ whenever(messageSinkMock.send(routedSyslogMessage))
+ .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
+ val result = cut.route(syslogMessage)
+
+ it("should be properly routed") {
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedSyslogMessage)
+ .verifyComplete()
+
+ verify(syslogSinkMock).topicName()
+ verify(messageSinkMock).send(routedSyslogMessage)
+ }
}
- }
- on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
- val result = cut.findDestination(message)
+ on("message with unknown route") {
+ val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+ val result = cut.route(message)
- it("should not have route available") {
- assertThat(result).isEqualTo(None)
+ it("should not have route available") {
+ StepVerifier.create(result).verifyComplete()
+ }
}
}
}
-}) \ No newline at end of file
+
+})
+
+private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
+ Router(routing, kafkaPublisherMap, ClientContext(), mock())
+
+private val perf3gppTopic = "PERF_PERF"
+private val perf3gppSinkMock = mock<KafkaSink>()
+private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+
+private val syslogTopic = "SYS_LOG"
+private val syslogSinkMock = mock<KafkaSink>()
+private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
+
+private val messageSinkMock = mock<Sink>()
+private val default_partition = None
+
+private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
+private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
+
+private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
+private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
index 571a6680..8616ce03 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
@@ -36,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
import reactor.core.publisher.Flux
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -64,8 +65,8 @@ internal object ConfigurationProviderImplTest : Spek({
.expectNoEvent(waitTime)
}
}
-
}
+
given("valid configuration from cbs") {
val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
@@ -76,18 +77,23 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
- val receivedSink1 = it.elementAt(0)
- val receivedSink2 = it.elementAt(1)
+ val route1 = it.elementAt(0)
+ val route2 = it.elementAt(1)
+ val receivedSink1 = route1.sink
+ val receivedSink2 = route2.sink
+ assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
assertThat(receivedSink1.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+ assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
assertThat(receivedSink2.bootstrapServers())
.isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
+
}.verifyComplete()
}
}
@@ -120,6 +126,10 @@ internal object ConfigurationProviderImplTest : Spek({
})
+
+val PERF3GPP_REGIONAL = "perf3gpp_regional"
+val PERF3GPP_CENTRAL = "perf3gpp_central"
+
private val aafCredentials1 = ImmutableAafCredentials.builder()
.username("client")
.password("very secure password")
@@ -133,7 +143,7 @@ private val aafCredentials2 = ImmutableAafCredentials.builder()
private val validConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
@@ -144,7 +154,7 @@ private val validConfiguration = JsonParser().parse("""
"topic_name": "REG_HVVES_PERF3GPP"
}
},
- "perf3gpp_central": {
+ "$PERF3GPP_CENTRAL": {
"type": "kafka",
"aaf_credentials": {
"username": "other_client",
@@ -161,7 +171,7 @@ private val validConfiguration = JsonParser().parse("""
private val invalidConfiguration = JsonParser().parse("""
{
"streams_publishes": {
- "perf3gpp_regional": {
+ "$PERF3GPP_REGIONAL": {
"type": "kafka",
"aaf_credentials": {
"username": "client",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
deleted file mode 100644
index eb0a3173..00000000
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import arrow.syntax.collections.tail
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.verify
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.ves.VesEventOuterClass
-import reactor.kafka.sender.KafkaSender
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-internal object KafkaSinkProviderTest : Spek({
- describe("non functional requirements") {
- given("sample configuration") {
- val config = CollectorConfiguration(
- maxRequestSizeBytes = 1024 * 1024,
- kafkaServers = "localhost:9090",
- routing = Routing(emptyList())
- )
-
- val cut = KafkaSinkProvider(config)
-
- on("sample clients") {
- val clients = listOf(
- ClientContext(),
- ClientContext(),
- ClientContext(),
- ClientContext())
-
- it("should create only one instance of KafkaSender") {
- val sinks = clients.map(cut::invoke)
- val firstSink = sinks[0]
- val restOfSinks = sinks.tail()
-
- assertThat(restOfSinks).isNotEmpty
- assertThat(restOfSinks).allSatisfy { sink ->
- assertThat(firstSink.usesSameSenderAs(sink))
- .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
- .isTrue()
- }
- }
- }
- }
-
- given("dummy KafkaSender") {
- val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
- val cut = KafkaSinkProvider(kafkaSender)
-
- on("close") {
- cut.close().unsafeRunSync()
-
- it("should close KafkaSender") {
- verify(kafkaSender).close()
- }
- }
- }
- }
-})