diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-26 14:21:02 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-28 14:16:02 +0100 |
commit | 2174a045086e16611128b20a6d4357c04d9eac4a (patch) | |
tree | 6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-ct/src | |
parent | 1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff) |
Redefine Routing
As all needed information to route messege is contained inside of
KafkaSink message, we can simply put this object as part of single Route.
Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
8 files changed, 180 insertions, 134 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index a6b32ed9..92719e94 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -107,8 +107,8 @@ object MetricsSpecification : Spek({ assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC)) .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric") .isEqualTo(2) - assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)) - .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric") + assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC)) + .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric") .isEqualTo(1) } } @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 50fe098c..61a9a356 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -79,7 +79,7 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec logger.info { "Processed $runs connections each containing $numMessages msgs." } - logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" } + logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" } assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) @@ -159,7 +159,7 @@ object PerformanceSpecification : Spek({ }) -private const val ONE_MILION = 1_000_000.0 +private const val ONE_MILLION = 1_000_000.0 private val rand = Random() private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 109915a1..ec540606 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,6 +27,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext @@ -37,8 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.utils.Closeable +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -47,7 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : AutoCloseable { +class Sut(sink: Sink = StoringSink()) : Closeable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -59,7 +61,9 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { sinkProvider, metrics, MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider) + healthStateProvider + ) + private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector @@ -67,51 +71,52 @@ class Sut(sink: Sink = StoringSink()) : AutoCloseable { throw IllegalStateException("Collector not available.") } - override fun close() { - collectorProvider.close().unsafeRunSync() + + fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) + return sink.sentMessages + } + + fun handleConnection(vararg packets: ByteBuf) { + collector.handleConnection(Flux.fromArray(packets)).block(timeout) } + override fun close() = collectorProvider.close() + companion object { const val MAX_PAYLOAD_SIZE_BYTES = 1024 } } - class DummySinkProvider(private val sink: Sink) : SinkProvider { - private val active = AtomicBoolean(true) + private val sinkInitialized = AtomicBoolean(false) - override fun invoke(ctx: ClientContext) = sink - - override fun close() = IO { - active.set(false) + override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy { + sinkInitialized.set(true) + sink } - val closed get() = !active.get() - + override fun close() = + if (sinkInitialized.get()) { + sink.close() + } else { + IO.unit + } } private val timeout = Duration.ofSeconds(10) -fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) - return sink.sentMessages -} - -fun Sut.handleConnection(vararg packets: ByteBuf) { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) -} - -fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(kafkaSinks) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 21c5c189..5d215fc5 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import arrow.core.None import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize @@ -65,12 +65,28 @@ object VesHvSpecification : Spek({ .hasSize(2) } + it("should create sink lazily") { + val (sut, sink) = vesHvWithStoringSink() + + // just connecting should not create sink + sut.handleConnection() + sut.close().unsafeRunSync() + + // then + assertThat(sink.closed).isFalse() + } + it("should close sink when closing collector provider") { - val (sut, _) = vesHvWithStoringSink() + val (sut, sink) = vesHvWithStoringSink() + // given Sink initialized + // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent + sut.handleConnection(vesWireFrameMessage(PERF3GPP)) - sut.close() + // when + sut.close().unsafeRunSync() - assertThat(sut.sinkProvider.closed).isTrue() + // then + assertThat(sink.closed).isTrue() } } @@ -145,14 +161,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None) } it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -161,14 +177,14 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(3) - assertThat(messages[0].topic).describedAs("first message topic") + assertThat(messages[0].targetTopic).describedAs("first message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[1].topic).describedAs("second message topic") + assertThat(messages[1].targetTopic).describedAs("second message topic") .isEqualTo(PERF3GPP_TOPIC) - assertThat(messages[2].topic).describedAs("last message topic") - .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + assertThat(messages[2].targetTopic).describedAs("last message topic") + .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) } it("should drop message if route was not found") { @@ -181,7 +197,7 @@ object VesHvSpecification : Spek({ assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) + assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -205,7 +221,7 @@ object VesHvSpecification : Spek({ it("should update collector") { val firstCollector = sut.collector - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val collectorAfterUpdate = sut.collector assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) @@ -213,21 +229,21 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configWithEmptyRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] - assertThat(message.topic).describedAs("routed message topic after configuration's change") + assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(PERF3GPP_TOPIC) assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should change domain routing") { @@ -236,22 +252,22 @@ object VesHvSpecification : Spek({ assertThat(messages).hasSize(1) val firstMessage = messages[0] - assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") + assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") .isEqualTo(PERF3GPP_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] - assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") + assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(0) + .isEqualTo(None) } it("should update routing for each client sending one message") { @@ -261,7 +277,7 @@ object VesHvSpecification : Spek({ Flux.range(0, messagesAmount).doOnNext { if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) } }.doOnNext { sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) @@ -269,8 +285,8 @@ object VesHvSpecification : Spek({ val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -287,7 +303,7 @@ object VesHvSpecification : Spek({ val incomingMessages = Flux.range(0, messageStreamSize) .doOnNext { if (it == pivot) { - sut.configurationProvider.updateConfiguration(configWithDifferentRouting) + sut.configurationProvider.updateConfiguration(alternativeRouting) println("config changed") } } @@ -297,8 +313,8 @@ object VesHvSpecification : Spek({ sut.collector.handleConnection(incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC } + val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } + val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -320,7 +336,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -349,6 +365,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(configWithBasicRouting) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index a398967d..c465fd91 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,67 +20,21 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink +import org.onap.dcae.collectors.veshv.config.api.model.Routing import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" -const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" -const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060" - -val configWithBasicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithTwoDomainsToOneTopicRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(HEARTBEAT.domainName) - .topicName(PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build(), - ImmutableKafkaSink.builder() - .name(MEASUREMENT.domainName) - .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithDifferentRouting = sequenceOf( - ImmutableKafkaSink.builder() - .name(PERF3GPP.domainName) - .topicName(ALTERNATE_PERF3GPP_TOPIC) - .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS) - .build() -) - -val configWithEmptyRouting = emptySequence<KafkaSink>() - - class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create() + private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(kafkaSinkSequence) + configStream.onNext(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index b599a076..a450b794 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ class FakeMetrics : Metrics { override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ - messagesSentToTopic.compute(msg.topic) { k, _ -> + messagesSentToTopic.compute(msg.targetTopic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 } lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt new file mode 100644 index 00000000..e9914ef1 --- /dev/null +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink + +const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" +const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" +const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" +const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val perf3gppKafkaSink = ImmutableKafkaSink.builder() + .name("PERF3GPP") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() +private val alternativeKafkaSink = ImmutableKafkaSink.builder() + .name("ALTERNATE") + .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS) + .topicName(ALTERNATE_PERF3GPP_TOPIC) + .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES) + .build() + + +val basicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink) +) + +val alternativeRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink) +) + +val twoDomainsToOneTopicRouting: Routing = listOf( + Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink), + Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink), + Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink) +) + +val emptyRouting: Routing = emptyList() diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 51f724e0..160defdb 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage @@ -30,6 +31,7 @@ import reactor.core.publisher.Flux import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong /** @@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong */ class StoringSink : Sink { private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + private val active = AtomicBoolean(true) + val closed get() = !active.get() val sentMessages: List<RoutedMessage> get() = sent.toList() @@ -45,6 +49,13 @@ class StoringSink : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> { return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage) } + + /* + * TOD0: if the code would look like: + * ```IO { active.set(false) }``` + * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec) + */ + override fun close() = active.set(false).run { IO.unit } } /** |