summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
authorPiotr Wielebski <piotr.wielebski@nokia.com>2019-04-01 13:11:13 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-01 13:11:13 +0000
commit087a6ef92e53452faaaee0872ad5183b08268c30 (patch)
tree8558648e9c2cb75c5d346560d03ddce9fd1a6e91 /sources/hv-collector-ct
parentbf318abbdccd73ff8df65eba777d94be66082e54 (diff)
parent6725abbaa6249e107126ffd5ec58f2a96ce60eee (diff)
Merge "Move ConfigurationProvider to config module"
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt7
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt29
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt155
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt37
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt46
5 files changed, 16 insertions, 258 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 61a9a356..35dfba8b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
@@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({
describe("VES High Volume Collector performance") {
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(basicRouting), sink)
val numMessages: Long = 300_000
val runs = 4
@@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(basicRouting), sink)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index ec540606..1217c471 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
-import arrow.core.getOrElse
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
@@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
@@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
@@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class Sut(sink: Sink = StoringSink()) : Closeable {
- val configurationProvider = FakeConfigurationProvider()
- val healthStateProvider = FakeHealthState()
+class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
val sinkProvider = DummySinkProvider(sink)
private val collectorFactory = CollectorFactory(
- configurationProvider,
+ configuration,
sinkProvider,
metrics,
- MAX_PAYLOAD_SIZE_BYTES,
- healthStateProvider
+ MAX_PAYLOAD_SIZE_BYTES
)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider(ClientContext(alloc)).getOrElse {
- throw IllegalStateException("Collector not available.")
- }
+ get() = collectorProvider(ClientContext(alloc))
fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
@@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
private val timeout = Duration.ofSeconds(10)
fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
- Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
- Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), AlwaysFailingSink())
fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
- Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(routing)
- }
+ Sut(CollectorConfiguration(routing), DelayingSink(delay))
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 5d215fc5..6a718eea 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -166,9 +168,7 @@ object VesHvSpecification : Spek({
}
it("should be able to direct 2 messages from different domains to one topic") {
- val (sut, sink) = vesHvWithStoringSink()
-
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+ val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -202,150 +202,6 @@ object VesHvSpecification : Spek({
}
}
- describe("configuration update") {
-
- val defaultTimeout = Duration.ofSeconds(10)
-
- given("successful configuration change") {
-
- lateinit var sut: Sut
- lateinit var sink: StoringSink
-
- beforeEachTest {
- vesHvWithStoringSink().run {
- sut = first
- sink = second
- }
- }
-
- it("should update collector") {
- val firstCollector = sut.collector
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- val collectorAfterUpdate = sut.collector
-
- assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
- }
-
- it("should start routing messages") {
-
- sut.configurationProvider.updateConfiguration(emptyRouting)
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).isEmpty()
-
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(1)
- val message = messagesAfterUpdate[0]
-
- assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(message.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should change domain routing") {
-
- val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messages).hasSize(1)
- val firstMessage = messages[0]
-
- assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
- .isEqualTo(PERF3GPP_TOPIC)
- assertThat(firstMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
-
-
- sut.configurationProvider.updateConfiguration(alternativeRouting)
-
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- assertThat(messagesAfterUpdate).hasSize(2)
- val secondMessage = messagesAfterUpdate[1]
-
- assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
- .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
- assertThat(secondMessage.partition).describedAs("routed message partition")
- .isEqualTo(None)
- }
-
- it("should update routing for each client sending one message") {
-
- val messagesAmount = 10
- val messagesForEachTopic = 5
-
- Flux.range(0, messagesAmount).doOnNext {
- if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- }
- }.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
- }.then().block(defaultTimeout)
-
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messagesAmount)
- assertThat(messagesForEachTopic)
- .describedAs("amount of messages routed to each topic")
- .isEqualTo(firstTopicMessagesCount)
- .isEqualTo(secondTopicMessagesCount)
- }
-
- it("should not update routing for client sending continuous stream of messages") {
-
- val messageStreamSize = 10
- val pivot = 5
-
- val incomingMessages = Flux.range(0, messageStreamSize)
- .doOnNext {
- if (it == pivot) {
- sut.configurationProvider.updateConfiguration(alternativeRouting)
- println("config changed")
- }
- }
- .map { vesWireFrameMessage(PERF3GPP) }
-
-
- sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
-
- val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
- val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
- assertThat(messages.size).isEqualTo(messageStreamSize)
- assertThat(firstTopicMessagesCount)
- .describedAs("amount of messages routed to first topic")
- .isEqualTo(messageStreamSize)
-
- assertThat(secondTopicMessagesCount)
- .describedAs("amount of messages routed to second topic")
- .isEqualTo(0)
- }
-
- it("should mark the application healthy") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.HEALTHY)
- }
- }
-
- given("failed configuration change") {
- val (sut, _) = vesHvWithStoringSink()
- sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicRouting)
-
- it("should mark the application unhealthy ") {
- assertThat(sut.healthStateProvider.currentHealth)
- .describedAs("application health state")
- .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- }
- }
-
describe("request validation") {
it("should reject message with payload greater than 1 MiB and all subsequent messages") {
val (sut, sink) = vesHvWithStoringSink()
@@ -362,9 +218,8 @@ object VesHvSpecification : Spek({
})
-private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
val sink = StoringSink()
- val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ val sut = Sut(CollectorConfiguration(routing), sink)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
deleted file mode 100644
index c25771b7..00000000
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import reactor.core.publisher.Flux
-
-class FakeHealthState : HealthState {
-
- lateinit var currentHealth: HealthDescription
-
- override fun changeState(healthDescription: HealthDescription) {
- currentHealth = healthDescription
- }
-
- override fun invoke(): Flux<HealthDescription> {
- throw NotImplementedError()
- }
-}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
deleted file mode 100644
index c465fd91..00000000
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import reactor.core.publisher.FluxProcessor
-import reactor.core.publisher.UnicastProcessor
-import reactor.retry.RetryExhaustedException
-
-
-class FakeConfigurationProvider : ConfigurationProvider {
- private var shouldThrowException = false
- private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
-
- fun updateConfiguration(routing: Routing) =
- if (shouldThrowException) {
- configStream.onError(RetryExhaustedException("I'm so tired"))
- } else {
- configStream.onNext(routing)
- }
-
-
- fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
- this.shouldThrowException = shouldThrowException
- }
-
- override fun invoke() = configStream
-}