summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-03-13 18:44:31 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-03-20 14:20:03 +0100
commit30afcb56b0c6c4529fdaf68d7b061eee44d68d16 (patch)
tree34ce26e44546033fa3572738e8ae59362783147a /sources/hv-collector-ct/src
parent189c70a48c24274fb7dd6cb910397a9a93233401 (diff)
Remove environment variables and program arguments
- Move all command line program arguments to json file. - Reorganize configuration classes and the way they are passed through application - Implement HV VES configuration stream - Create concrete configuration from partial one - Modify main HV-VES server starting pipeline Change-Id: I6cf874b6904ed768e4820b8132f5f760299c929e Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1340
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt14
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt27
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt29
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt88
5 files changed, 83 insertions, 81 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index aaa3ee3b..bd056d4d 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+ val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index dc5fe60b..ece42285 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index d97541ba..e84e9486 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -27,11 +27,18 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
+import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -40,7 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()): AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : AutoCloseable {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -94,17 +101,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
-fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(collectorConfiguration)
+ configurationProvider.updateConfiguration(routing)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index ed46b119..17f6ce32 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,21 +24,24 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
-
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
import reactor.core.publisher.Flux
import java.time.Duration
@@ -149,7 +152,7 @@ object VesHvSpecification : Spek({
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -210,12 +213,12 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+ sut.configurationProvider.updateConfiguration(emptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
@@ -317,7 +320,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -346,6 +349,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ sut.configurationProvider.updateConfiguration(basicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index c7e12bbd..1ad2b0e3 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,12 +20,11 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
@@ -35,62 +34,55 @@ const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
-
-val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
+val basicRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
-val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- }.build()
-)
+val twoDomainsToOneTopicRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(HEARTBEAT.domainName)
+ toTopic(PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+ defineRoute {
+ fromDomain(MEASUREMENT.domainName)
+ toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
+
+
+val configurationWithDifferentRouting = routing {
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic(ALTERNATE_PERF3GPP_TOPIC)
+ withFixedPartitioning()
+ }
+}.build()
+
+val emptyRouting = routing { }.build()
-val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- routing = routing {
- }.build()
-)
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
- fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+ fun updateConfiguration(routing: Routing) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(collectorConfiguration)
+ configStream.onNext(routing)
}