summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
authorkjaniak <kornel.janiak@nokia.com>2019-03-21 14:03:53 +0100
committerkjaniak <kornel.janiak@nokia.com>2019-03-26 13:05:51 +0100
commit7e77162022371860d13939be1848982a735cdab9 (patch)
tree6603c70af9029701294fd3783c9df1a1016b7c8f /sources/hv-collector-ct/src
parentc979ea3bd6059cb067a84ba8e6f8d1cf96d61ba2 (diff)
Use DataStream API from CBS client
Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb Issue-ID: DCAEGEN2-1346 Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt14
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt16
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt85
5 files changed, 74 insertions, 73 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index bd056d4d..a6b32ed9 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for sing errors") {
- val sut = vesHvWithAlwaysFailingSink(basicRouting)
+ val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
}
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+ val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index ece42285..50fe098c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val numMessages: Long = 300_000
val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e84e9486..da9290d3 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.Flux
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
@@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(AlwaysSuccessfulSink()).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
-fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(AlwaysFailingSink()).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
-fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
Sut(DelayingSink(delay)).apply {
- configurationProvider.updateConfiguration(routing)
+ configurationProvider.updateConfiguration(kafkaSinks)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 17f6ce32..21c5c189 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -152,7 +152,7 @@ object VesHvSpecification : Spek({
it("should be able to direct 2 messages from different domains to one topic") {
val (sut, sink) = vesHvWithStoringSink()
- sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+ sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
val messages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP),
@@ -205,7 +205,7 @@ object VesHvSpecification : Spek({
it("should update collector") {
val firstCollector = sut.collector
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
val collectorAfterUpdate = sut.collector
assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,12 +213,12 @@ object VesHvSpecification : Spek({
it("should start routing messages") {
- sut.configurationProvider.updateConfiguration(emptyRouting)
+ sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).isEmpty()
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(1)
@@ -242,7 +242,7 @@ object VesHvSpecification : Spek({
.isEqualTo(0)
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messagesAfterUpdate).hasSize(2)
@@ -261,7 +261,7 @@ object VesHvSpecification : Spek({
Flux.range(0, messagesAmount).doOnNext {
if (it == messagesForEachTopic) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
}
}.doOnNext {
sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
val incomingMessages = Flux.range(0, messageStreamSize)
.doOnNext {
if (it == pivot) {
- sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
println("config changed")
}
}
@@ -320,7 +320,7 @@ object VesHvSpecification : Spek({
given("failed configuration change") {
val (sut, _) = vesHvWithStoringSink()
sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +349,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
val sink = StoringSink()
val sut = Sut(sink)
- sut.configurationProvider.updateConfiguration(basicRouting)
+ sut.configurationProvider.updateConfiguration(configWithBasicRouting)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 1ad2b0e3..213eff27 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,11 +20,12 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
@@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-
-val basicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val twoDomainsToOneTopicRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(HEARTBEAT.domainName)
- toTopic(PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
- defineRoute {
- fromDomain(MEASUREMENT.domainName)
- toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val configurationWithDifferentRouting = routing {
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic(ALTERNATE_PERF3GPP_TOPIC)
- withFixedPartitioning()
- }
-}.build()
-
-
-val emptyRouting = routing { }.build()
+const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
+
+val configWithBasicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithTwoDomainsToOneTopicRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(HEARTBEAT.domainName)
+ .topicName(PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build(),
+ ImmutableKafkaSink.builder()
+ .name(MEASUREMENT.domainName)
+ .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+)
+
+val configWithDifferentRouting = sequenceOf(
+ ImmutableKafkaSink.builder()
+ .name(PERF3GPP.domainName)
+ .topicName(ALTERNATE_PERF3GPP_TOPIC)
+ .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+ .build()
+ )
+
+val configWithEmptyRouting = emptySequence<KafkaSink>()
class FakeConfigurationProvider : ConfigurationProvider {
private var shouldThrowException = false
- private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
+ private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
- fun updateConfiguration(routing: Routing) =
+ fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
if (shouldThrowException) {
configStream.onError(RetryExhaustedException("I'm so tired"))
} else {
- configStream.onNext(routing)
+ configStream.onNext(kafkaSinkSequence)
}