diff options
Diffstat (limited to 'sources/hv-collector-ct')
5 files changed, 16 insertions, 258 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 61a9a356..35dfba8b 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage @@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({ describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 300_000 val runs = 4 @@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(basicRouting), sink) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index ec540606..1217c471 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import arrow.core.getOrElse import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory @@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink -import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting @@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class Sut(sink: Sink = StoringSink()) : Closeable { - val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthState() +class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable { val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT val metrics = FakeMetrics() val sinkProvider = DummySinkProvider(sink) private val collectorFactory = CollectorFactory( - configurationProvider, + configuration, sinkProvider, metrics, - MAX_PAYLOAD_SIZE_BYTES, - healthStateProvider + MAX_PAYLOAD_SIZE_BYTES ) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector - get() = collectorProvider(ClientContext(alloc)).getOrElse { - throw IllegalStateException("Collector not available.") - } + get() = collectorProvider(ClientContext(alloc)) fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { @@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider { private val timeout = Duration.ofSeconds(10) fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = - Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), AlwaysFailingSink()) fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = - Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(routing) - } + Sut(CollectorConfiguration(routing), DelayingSink(delay)) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 5d215fc5..6a718eea 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER @@ -166,9 +168,7 @@ object VesHvSpecification : Spek({ } it("should be able to direct 2 messages from different domains to one topic") { - val (sut, sink) = vesHvWithStoringSink() - - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) + val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -202,150 +202,6 @@ object VesHvSpecification : Spek({ } } - describe("configuration update") { - - val defaultTimeout = Duration.ofSeconds(10) - - given("successful configuration change") { - - lateinit var sut: Sut - lateinit var sink: StoringSink - - beforeEachTest { - vesHvWithStoringSink().run { - sut = first - sink = second - } - } - - it("should update collector") { - val firstCollector = sut.collector - - sut.configurationProvider.updateConfiguration(alternativeRouting) - val collectorAfterUpdate = sut.collector - - assertThat(collectorAfterUpdate).isNotSameAs(firstCollector) - } - - it("should start routing messages") { - - sut.configurationProvider.updateConfiguration(emptyRouting) - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).isEmpty() - - sut.configurationProvider.updateConfiguration(basicRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(1) - val message = messagesAfterUpdate[0] - - assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(message.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should change domain routing") { - - val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messages).hasSize(1) - val firstMessage = messages[0] - - assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration") - .isEqualTo(PERF3GPP_TOPIC) - assertThat(firstMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - - - sut.configurationProvider.updateConfiguration(alternativeRouting) - - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - assertThat(messagesAfterUpdate).hasSize(2) - val secondMessage = messagesAfterUpdate[1] - - assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change") - .isEqualTo(ALTERNATE_PERF3GPP_TOPIC) - assertThat(secondMessage.partition).describedAs("routed message partition") - .isEqualTo(None) - } - - it("should update routing for each client sending one message") { - - val messagesAmount = 10 - val messagesForEachTopic = 5 - - Flux.range(0, messagesAmount).doOnNext { - if (it == messagesForEachTopic) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - } - }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) - }.then().block(defaultTimeout) - - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messagesAmount) - assertThat(messagesForEachTopic) - .describedAs("amount of messages routed to each topic") - .isEqualTo(firstTopicMessagesCount) - .isEqualTo(secondTopicMessagesCount) - } - - it("should not update routing for client sending continuous stream of messages") { - - val messageStreamSize = 10 - val pivot = 5 - - val incomingMessages = Flux.range(0, messageStreamSize) - .doOnNext { - if (it == pivot) { - sut.configurationProvider.updateConfiguration(alternativeRouting) - println("config changed") - } - } - .map { vesWireFrameMessage(PERF3GPP) } - - - sut.collector.handleConnection(incomingMessages).block(defaultTimeout) - - val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC } - val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC } - - assertThat(messages.size).isEqualTo(messageStreamSize) - assertThat(firstTopicMessagesCount) - .describedAs("amount of messages routed to first topic") - .isEqualTo(messageStreamSize) - - assertThat(secondTopicMessagesCount) - .describedAs("amount of messages routed to second topic") - .isEqualTo(0) - } - - it("should mark the application healthy") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.HEALTHY) - } - } - - given("failed configuration change") { - val (sut, _) = vesHvWithStoringSink() - sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicRouting) - - it("should mark the application unhealthy ") { - assertThat(sut.healthStateProvider.currentHealth) - .describedAs("application health state") - .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - } - } - describe("request validation") { it("should reject message with payload greater than 1 MiB and all subsequent messages") { val (sut, sink) = vesHvWithStoringSink() @@ -362,9 +218,8 @@ object VesHvSpecification : Spek({ }) -private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> { +private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> { val sink = StoringSink() - val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicRouting) + val sut = Sut(CollectorConfiguration(routing), sink) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt deleted file mode 100644 index c25771b7..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import reactor.core.publisher.Flux - -class FakeHealthState : HealthState { - - lateinit var currentHealth: HealthDescription - - override fun changeState(healthDescription: HealthDescription) { - currentHealth = healthDescription - } - - override fun invoke(): Flux<HealthDescription> { - throw NotImplementedError() - } -} diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt deleted file mode 100644 index c465fd91..00000000 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing -import reactor.core.publisher.FluxProcessor -import reactor.core.publisher.UnicastProcessor -import reactor.retry.RetryExhaustedException - - -class FakeConfigurationProvider : ConfigurationProvider { - private var shouldThrowException = false - private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create() - - fun updateConfiguration(routing: Routing) = - if (shouldThrowException) { - configStream.onError(RetryExhaustedException("I'm so tired")) - } else { - configStream.onNext(routing) - } - - - fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) { - this.shouldThrowException = shouldThrowException - } - - override fun invoke() = configStream -} |