summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-18 15:58:56 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-20 14:57:25 +0100
commit4128aa2c9368ed20fab92e8c0df83f14d6233b86 (patch)
treecff4cf2428a288b7b86830f282b81d41a41ad250 /sources/hv-collector-ct/src
parent4ab95420e42f6df59bd4851eee41be6579bdbbe1 (diff)
There should be one KafkaSender per configuration
We should keep only one instance of KafkaSender per instance. However, as the configuration might be changed (Consul update) it cannot be a strict singleton. Hence there should be 1to1 relationship beetween ConsulConfiguration and KafkaSender. Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668 Issue-ID: DCAEGEN2-1047 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-ct/src')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt26
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt27
4 files changed, 46 insertions, 22 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index f457aeaf..aaa3ee3b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -31,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
@@ -50,7 +51,7 @@ object MetricsSpecification : Spek({
describe("Bytes received metrics") {
it("should sum up all bytes received") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val vesWireFrameMessage = vesWireFrameMessage()
val invalidWireFrame = messageWithInvalidWireFrameHeader()
@@ -70,7 +71,7 @@ object MetricsSpecification : Spek({
describe("Messages received metrics") {
it("should sum up all received messages bytes") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
val firstVesMessage = vesWireFrameMessage(firstVesEvent)
@@ -91,7 +92,7 @@ object MetricsSpecification : Spek({
describe("Messages sent metrics") {
it("should gather info for each topic separately") {
- val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
sut.handleConnection(
vesWireFrameMessage(PERF3GPP),
@@ -129,7 +130,7 @@ object MetricsSpecification : Spek({
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
messageWithInvalidWireFrameHeader(),
@@ -145,7 +146,7 @@ object MetricsSpecification : Spek({
}
it("should gather metrics for route not found") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -158,8 +159,19 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
+ it("should gather metrics for sing errors") {
+ val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+
+ sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
+
+ val metrics = sut.metrics
+ assertThat(metrics.messagesDropped(KAFKA_FAILURE))
+ .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
+ .isEqualTo(1)
+ }
+
it("should gather summed metrics for dropped messages") {
- val sut = vesHvWithNoOpSink(basicConfiguration)
+ val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
sut.handleConnection(
vesWireFrameMessage(domain = PERF3GPP),
@@ -183,7 +195,7 @@ object MetricsSpecification : Spek({
).forEach { cause, vesMessage ->
on("cause $cause") {
it("should notify correct metrics") {
- val sut = vesHvWithNoOpSink()
+ val sut = vesHvWithAlwaysSuccessfulSink()
sut.handleConnection(vesMessage)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 7ebbfba0..c3e4a581 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -73,12 +73,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
collector.handleConnection(Flux.fromArray(packets)).block(timeout)
}
-fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(NoOpSink()).apply {
+fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysSuccessfulSink()).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
+
+fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(AlwaysFailingSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
- Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ Sut(DelayingSink(delay)).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 3770913a..db56e88c 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,7 +36,6 @@ const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -47,7 +46,6 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
)
val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -69,7 +67,6 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
fromDomain(PERF3GPP.domainName)
@@ -81,7 +78,6 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
- kafkaBootstrapServers = "localhost:9969",
routing = routing {
}.build()
)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 2f731f53..b4ce6499 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,13 +21,17 @@ package org.onap.dcae.collectors.veshv.tests.fakes
import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,8 +43,8 @@ class StoringSink : Sink {
val sentMessages: List<RoutedMessage>
get() = sent.toList()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- return messages.doOnNext(sent::addLast)
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
+ return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
}
@@ -54,16 +58,23 @@ class CountingSink : Sink {
val count: Long
get() = atomicCount.get()
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext {
atomicCount.incrementAndGet()
- }
+ }.map(::SuccessfullyConsumedMessage)
}
}
-open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
+open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+ messages.transform(transformer)
}
-class NoOpSink : ProcessingSink(::identity)
+class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) })
+
+class AlwaysFailingSink : ProcessingSink({ stream ->
+ stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) }
+})
+
+class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })