summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt27
4 files changed, 46 insertions, 22 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index f457aeaf..aaa3ee3b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
@@ -50,7 +51,7 @@ object MetricsSpecification : Spek({
describe("Bytes received metrics") {
it("should sum up all bytes received") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val vesWireFrameMessage = vesWireFrameMessage()
val invalidWireFrame = messageWithInvalidWireFrameHeader()
@@ -70,7 +71,7 @@ object MetricsSpecification : Spek({
describe("Messages received metrics") {
it("should sum up all received messages bytes") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
val firstVesMessage = vesWireFrameMessage(firstVesEvent)
@@ -91,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -129,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -145,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -158,8 +159,19 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
+ it("should gather metrics for sing errors") {
+ val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+
+ sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(KAFKA_FAILURE))
+ .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
+ .isEqualTo(1)
+ }
+
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -183,7 +195,7 @@ object MetricsSpecification : Spek({
).forEach { cause, vesMessage ->
on("cause $cause") {
it("should notify correct metrics") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
sut.handleConnection(vesMessage)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 7ebbfba0..c3e4a581 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(NoOpSink()).apply {
+fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysSuccessfulSink()).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
+
+fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysFailingSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ Sut(DelayingSink(delay)).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 3770913a..db56e88c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
)
val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
}.build()
)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 2f731f53..b4ce6499 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes
import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,8 +43,8 @@ class StoringSink : Sink {
val sentMessages: List<RoutedMessage>
get() = sent.toList()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext(sent::addLast)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
+ return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
}
@@ -54,16 +58,23 @@ class CountingSink : Sink {
val count: Long
get() = atomicCount.get()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext {
atomicCount.incrementAndGet()
- }
+ }.map(::SuccessfullyConsumedMessage)
}
}
-open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
+open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.transform(transformer)
}
-class NoOpSink : ProcessingSink(::identity)
+class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) })
+
+class AlwaysFailingSink : ProcessingSink({ stream ->
+ stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) }
+})
+
+class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })