aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt22
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt18
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
4 files changed, 47 insertions, 8 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index dd8acf77..9f5c37e1 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -33,7 +33,12 @@ import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TO
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import java.time.Duration
object MetricsSpecification : Spek({
debugRx(false)
@@ -102,6 +107,21 @@ object MetricsSpecification : Spek({
}
}
+ describe("Processing time") {
+ it("should gather processing time metric") {
+ val delay = Duration.ofMillis(10)
+ val sut = vesHvWithDelayingSink(delay)
+
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
+
+
+ val metrics = sut.metrics
+ assertThat(metrics.lastProcessingTimeMicros)
+ .describedAs("processingTime metric")
+ .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
+ }
+ }
+
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
val sut = vesHvWithNoOpSink(basicConfiguration)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 0c1b589b..7ebbfba0 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -77,3 +77,8 @@ fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConf
Sut(NoOpSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
+
+fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index 9ddb7115..660ce498 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -20,7 +20,11 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import kotlin.test.fail
@@ -31,6 +35,7 @@ import kotlin.test.fail
class FakeMetrics : Metrics {
var bytesReceived: Int = 0
var messageBytesReceived: Int = 0
+ var lastProcessingTimeMicros: Double = -1.0
var messagesSentCount: Int = 0
var messagesDroppedCount: Int = 0
@@ -41,13 +46,16 @@ class FakeMetrics : Metrics {
bytesReceived += size
}
- override fun notifyMessageReceived(size: Int) {
- messageBytesReceived += size
+ override fun notifyMessageReceived(msg: WireFrameMessage) {
+ messageBytesReceived += msg.payloadSize
}
- override fun notifyMessageSent(topic: String) {
+ override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
- messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 }
+ messagesSentToTopic.compute(msg.topic) { k, _ ->
+ messagesSentToTopic[k]?.inc() ?: 1
+ }
+ lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -61,4 +69,4 @@ class FakeMetrics : Metrics {
fun messagesDropped(cause: MessageDropCause) =
messagesDroppedCause[cause]
?: fail("No messages were dropped due to cause: ${cause.name}")
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 865dd510..2f731f53 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -19,12 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
+import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -58,6 +61,9 @@ class CountingSink : Sink {
}
}
-class NoOpSink : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages
+
+open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
}
+
+class NoOpSink : ProcessingSink(::identity)