From 30afcb56b0c6c4529fdaf68d7b061eee44d68d16 Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Wed, 13 Mar 2019 18:44:31 +0100 Subject: Remove environment variables and program arguments - Move all command line program arguments to json file. - Reorganize configuration classes and the way they are passed through application - Implement HV VES configuration stream - Create concrete configuration from partial one - Modify main HV-VES server starting pipeline Change-Id: I6cf874b6904ed768e4820b8132f5f760299c929e Signed-off-by: Jakub Dudycz Issue-ID: DCAEGEN2-1340 --- .../veshv/tests/component/MetricsSpecification.kt | 14 ++-- .../tests/component/PerformanceSpecification.kt | 6 +- .../dcae/collectors/veshv/tests/component/Sut.kt | 27 ++++--- .../veshv/tests/component/VesHvSpecification.kt | 29 +++---- .../collectors/veshv/tests/fakes/configuration.kt | 88 ++++++++++------------ 5 files changed, 83 insertions(+), 81 deletions(-) (limited to 'sources/hv-collector-ct') diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index aaa3ee3b..bd056d4d 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader @@ -92,7 +92,7 @@ object MetricsSpecification : Spek({ describe("Messages sent metrics") { it("should gather info for each topic separately") { - val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting) sut.handleConnection( vesWireFrameMessage(PERF3GPP), @@ -130,7 +130,7 @@ object MetricsSpecification : Spek({ describe("Messages dropped metrics") { it("should gather metrics for invalid messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( messageWithInvalidWireFrameHeader(), @@ -146,7 +146,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for route not found") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), @@ -160,7 +160,7 @@ object MetricsSpecification : Spek({ } it("should gather metrics for sing errors") { - val sut = vesHvWithAlwaysFailingSink(basicConfiguration) + val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -171,7 +171,7 @@ object MetricsSpecification : Spek({ } it("should gather summed metrics for dropped messages") { - val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration) + val sut = vesHvWithAlwaysSuccessfulSink(basicRouting) sut.handleConnection( vesWireFrameMessage(domain = PERF3GPP), diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index dc5fe60b..ece42285 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ it("should handle multiple clients in reasonable time") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 300_000 val runs = 4 @@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index d97541ba..e84e9486 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.tests.fakes.* +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink +import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink +import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import reactor.core.publisher.Flux import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean * @author Piotr Jaszczyk * @since May 2018 */ -class Sut(sink: Sink = StoringSink()): AutoCloseable { +class Sut(sink: Sink = StoringSink()) : AutoCloseable { val configurationProvider = FakeConfigurationProvider() val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT @@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) { collector.handleConnection(Flux.fromArray(packets)).block(timeout) } -fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(AlwaysSuccessfulSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = Sut(AlwaysFailingSink()).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } -fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut = +fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = Sut(DelayingSink(delay)).apply { - configurationProvider.updateConfiguration(collectorConfiguration) + configurationProvider.updateConfiguration(routing) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index ed46b119..17f6ce32 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink -import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting -import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting -import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.* - +import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting +import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting +import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame +import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader +import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize +import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage +import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload import reactor.core.publisher.Flux import java.time.Duration @@ -149,7 +152,7 @@ object VesHvSpecification : Spek({ it("should be able to direct 2 messages from different domains to one topic") { val (sut, sink) = vesHvWithStoringSink() - sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) + sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP), @@ -210,12 +213,12 @@ object VesHvSpecification : Spek({ it("should start routing messages") { - sut.configurationProvider.updateConfiguration(configurationWithoutRouting) + sut.configurationProvider.updateConfiguration(emptyRouting) val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messages).isEmpty() - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP)) assertThat(messagesAfterUpdate).hasSize(1) @@ -317,7 +320,7 @@ object VesHvSpecification : Spek({ given("failed configuration change") { val (sut, _) = vesHvWithStoringSink() sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) @@ -346,6 +349,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(): Pair { val sink = StoringSink() val sut = Sut(sink) - sut.configurationProvider.updateConfiguration(basicConfiguration) + sut.configurationProvider.updateConfiguration(basicRouting) return Pair(sut, sink) } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index c7e12bbd..1ad2b0e3 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,12 +20,11 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.config.api.model.routing -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT - +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException @@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" -val basicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) - -val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(HEARTBEAT.domainName) - toTopic(PERF3GPP_TOPIC) - withFixedPartitioning() - } - defineRoute { - fromDomain(MEASUREMENT.domainName) - toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) - withFixedPartitioning() - } - }.build() -) +val basicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() -val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic(ALTERNATE_PERF3GPP_TOPIC) - withFixedPartitioning() - } - }.build() -) +val twoDomainsToOneTopicRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(HEARTBEAT.domainName) + toTopic(PERF3GPP_TOPIC) + withFixedPartitioning() + } + defineRoute { + fromDomain(MEASUREMENT.domainName) + toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) + withFixedPartitioning() + } +}.build() + + +val configurationWithDifferentRouting = routing { + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic(ALTERNATE_PERF3GPP_TOPIC) + withFixedPartitioning() + } +}.build() + +val emptyRouting = routing { }.build() -val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration( - routing = routing { - }.build() -) class FakeConfigurationProvider : ConfigurationProvider { private var shouldThrowException = false - private val configStream: FluxProcessor = UnicastProcessor.create() + private val configStream: FluxProcessor = UnicastProcessor.create() - fun updateConfiguration(collectorConfiguration: CollectorConfiguration) = + fun updateConfiguration(routing: Routing) = if (shouldThrowException) { configStream.onError(RetryExhaustedException("I'm so tired")) } else { - configStream.onNext(collectorConfiguration) + configStream.onNext(routing) } -- cgit 1.2.3-korg