aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 14:21:02 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-28 14:16:02 +0100
commit2174a045086e16611128b20a6d4357c04d9eac4a (patch)
tree6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-ct
parent1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff)
Redefine Routing
As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt22
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt12
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt63
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt86
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt54
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt60
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt13
8 files changed, 180 insertions, 134 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index a6b32ed9..92719e94 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -107,8 +107,8 @@ object MetricsSpecification : Spek({
assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
.describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
.isEqualTo(2)
- assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
- .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
+ assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
+ .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
.isEqualTo(1)
}
}
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 50fe098c..61a9a356 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({
val durationSec = durationMs / 1000.0
val throughput = sink.count / durationSec
logger.info { "Processed $runs connections each containing $numMessages msgs." }
- logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
+ logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
assertThat(sink.count)
.describedAs("should send all events")
.isEqualTo(runs * numMessages)
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
@@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({
})
-private const val ONE_MILION = 1_000_000.0
+private const val ONE_MILLION = 1_000_000.0
private val rand = Random()
private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 109915a1..ec540606 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) : AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : Closeable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
sinkProvider,
metrics,
MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider)
+ healthStateProvider
+ )
+
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
@@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable {
throw IllegalStateException("Collector not available.")
}
- override fun close() {
- collectorProvider.close().unsafeRunSync()
+
+ fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ return sink.sentMessages
+ }
+
+ fun handleConnection(vararg packets: ByteBuf) {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
+ override fun close() = collectorProvider.close()
+
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
}
-
class DummySinkProvider(private val sink: Sink) : SinkProvider {
- private val active = AtomicBoolean(true)
+ private val sinkInitialized = AtomicBoolean(false)
- override fun invoke(ctx: ClientContext) = sink
-
- override fun close() = IO {
- active.set(false)
+ override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+ sinkInitialized.set(true)
+ sink
}
- val closed get() = !active.get()
-
+ override fun close() =
+ if (sinkInitialized.get()) {
+ sink.close()
+ } else {
+ IO.unit
+ }
}
private val timeout = Duration.ofSeconds(10)
-fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
- return sink.sentMessages
-}
-
-fun Sut.handleConnection(vararg packets: ByteBuf) {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
-}
-
-fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(kafkaSinks)
+ configurationProvider.updateConfiguration(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 21c5c189..5d215fc5 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.None
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -65,12 +65,28 @@ object VesHvSpecification : Spek({
.hasSize(2)
}
+ it("should create sink lazily") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ // just connecting should not create sink
+ sut.handleConnection()
+ sut.close().unsafeRunSync()
+
+ // then
+ assertThat(sink.closed).isFalse()
+ }
+
it("should close sink when closing collector provider") {
- val (sut, _) = vesHvWithStoringSink()
+ val (sut, sink) = vesHvWithStoringSink()
+ // given Sink initialized
+ // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
- sut.close()
+ // when
+ sut.close().unsafeRunSync()
- assertThat(sut.sinkProvider.closed).isTrue()
+ // then
+ assertThat(sink.closed).isTrue()
}
}
@@ -145,14 +161,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
}
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -161,14 +177,14 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(3)
- assertThat(messages[0].topic).describedAs("first message topic")
+ assertThat(messages[0].targetTopic).describedAs("first message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[1].topic).describedAs("second message topic")
+ assertThat(messages[1].targetTopic).describedAs("second message topic")
.isEqualTo(PERF3GPP_TOPIC)
- assertThat(messages[2].topic).describedAs("last message topic")
- .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ assertThat(messages[2].targetTopic).describedAs("last message topic")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
}
it("should drop message if route was not found") {
@@ -181,7 +197,7 @@ object VesHvSpecification : Spek({
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
@@ -205,7 +221,7 @@ object VesHvSpecification : Spek({
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,21 +229,21 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
- assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should change domain routing") {
@@ -236,22 +252,22 @@ object VesHvSpecification : Spek({
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
- assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
.isEqualTo(PERF3GPP_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
- assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
.isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(0)
+ .isEqualTo(None)
}
it("should update routing for each client sending one message") {
@@ -261,7 +277,7 @@ object VesHvSpecification : Spek({
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -269,8 +285,8 @@ object VesHvSpecification : Spek({
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
@@ -287,7 +303,7 @@ object VesHvSpecification : Spek({
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(alternativeRouting)
println("config changed")
}
}
@@ -297,8 +313,8 @@ object VesHvSpecification : Spek({
sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
@@ -320,7 +336,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +365,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index a398967d..c465fd91 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,67 +20,21 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
-const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
-
-val configWithBasicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithTwoDomainsToOneTopicRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(HEARTBEAT.domainName)
- .topicName(PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build(),
- ImmutableKafkaSink.builder()
- .name(MEASUREMENT.domainName)
- .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithDifferentRouting = sequenceOf(
- ImmutableKafkaSink.builder()
- .name(PERF3GPP.domainName)
- .topicName(ALTERNATE_PERF3GPP_TOPIC)
- .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
- .build()
-)
-
-val configWithEmptyRouting = emptySequence<KafkaSink>()
-
-
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
- fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
+ fun updateConfiguration(routing: Routing) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(kafkaSinkSequence)
+ configStream.onNext(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index b599a076..a450b794 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@ class FakeMetrics : Metrics {
override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
- messagesSentToTopic.compute(msg.topic) { k, _ ->
+ messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
messagesSentToTopic[k]?.inc() ?: 1
}
lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
new file mode 100644
index 00000000..e9914ef1
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
+const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
+const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
+ .name("PERF3GPP")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+private val alternativeKafkaSink = ImmutableKafkaSink.builder()
+ .name("ALTERNATE")
+ .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+ .build()
+
+
+val basicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink)
+)
+
+val alternativeRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink)
+)
+
+val twoDomainsToOneTopicRouting: Routing = listOf(
+ Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink),
+ Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink)
+)
+
+val emptyRouting: Routing = emptyList()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 51f724e0..160defdb 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
@@ -30,6 +31,7 @@ import reactor.core.publisher.Flux
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
@@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong
*/
class StoringSink : Sink {
private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+ private val active = AtomicBoolean(true)
+ val closed get() = !active.get()
val sentMessages: List<RoutedMessage>
get() = sent.toList()
@@ -45,6 +49,13 @@ class StoringSink : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
+
+ /*
+ * TOD0: if the code would look like:
+ * ```IO { active.set(false) }```
+ * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
+ */
+ override fun close() = active.set(false).run { IO.unit }
}
/**