aboutsummaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt110
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt28
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt13
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt4
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt24
5 files changed, 157 insertions, 22 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
new file mode 100644
index 00000000..2feca410
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import com.google.protobuf.ByteString
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.NoOpSink
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import kotlin.test.fail
+
+object MetricsSpecification : Spek({
+ debugRx(false)
+
+ describe("Bytes received metrics") {
+ it("should sum up all bytes received") {
+ val sut = vesHvWithNoOpSink()
+ val vesWireFrameMessage = vesWireFrameMessage()
+ val invalidWireFrame = invalidWireFrame()
+
+ val bytesSent = invalidWireFrame.readableBytes() +
+ vesWireFrameMessage.readableBytes()
+ sut.handleConnection(
+ vesWireFrameMessage,
+ invalidWireFrame
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.bytesReceived)
+ .describedAs("bytesReceived metric")
+ .isEqualTo(bytesSent)
+ }
+ }
+
+ describe("Messages received metrics") {
+ it("should sum up all received messages bytes") {
+ val sut = vesHvWithNoOpSink()
+ val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
+ val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
+ val firstVesMessage = vesWireFrameMessage(firstVesEvent)
+ val secondVesMessage = vesWireFrameMessage(secondVesEvent)
+
+ val serializedMessagesSize = firstVesEvent.serializedSize + secondVesEvent.serializedSize
+ sut.handleConnection(
+ firstVesMessage,
+ secondVesMessage
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.messageBytesReceived)
+ .describedAs("messageBytesReceived metric")
+ .isEqualTo(serializedMessagesSize)
+ }
+ }
+
+ describe("Messages sent metrics") {
+ it("should gather info for each topic separately") {
+ val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+
+ sut.handleConnection(
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(VesEventDomain.MEASUREMENT)
+ )
+
+ val metrics = sut.metrics
+ assertThat(metrics.messageSentCount)
+ .describedAs("messageSentCount metric")
+ .isEqualTo(3)
+ assertThat(messagesOnTopic(metrics, PERF3GPP_TOPIC))
+ .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
+ .isEqualTo(2)
+ assertThat(messagesOnTopic(metrics, MEASUREMENTS_FOR_VF_SCALING_TOPIC))
+ .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
+ .isEqualTo(1)
+ }
+ }
+})
+
+private fun messagesOnTopic(metrics: FakeMetrics, topic: String) =
+ metrics.messagesSentToTopic.get(topic) ?: fail("No messages were sent to topic $topic") \ No newline at end of file
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index ce242e0b..0c1b589b 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -28,11 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.*
import reactor.core.publisher.Flux
import java.time.Duration
@@ -43,9 +41,9 @@ import java.time.Duration
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
val healthStateProvider = FakeHealthState()
-
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
- private val metrics = FakeMetrics()
+ val metrics = FakeMetrics()
+
private val collectorFactory = CollectorFactory(
configurationProvider,
SinkProvider.just(sink),
@@ -55,15 +53,27 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") }
+ get() = collectorProvider(ClientContext(alloc)).getOrElse {
+ throw IllegalStateException("Collector not available.")
+ }
companion object {
const val MAX_PAYLOAD_SIZE_BYTES = 1024
}
-
}
+private val timeout = Duration.ofSeconds(10)
+
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
return sink.sentMessages
}
+
+fun Sut.handleConnection(vararg packets: ByteBuf) {
+ collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+}
+
+fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(NoOpSink()).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index f3fc2381..dd098052 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -20,18 +20,31 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
class FakeMetrics : Metrics {
+ var bytesReceived: Int = 0
+
+ var messageBytesReceived: Int = 0
+
+ var messageSentCount: Int = 0
+ val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
+
override fun notifyBytesReceived(size: Int) {
+ bytesReceived += size
}
override fun notifyMessageReceived(size: Int) {
+ messageBytesReceived += size
}
override fun notifyMessageSent(topic: String) {
+ messageSentCount++
+ messagesSentToTopic.compute(topic, { k, v -> messagesSentToTopic.get(k)?.inc() ?: 1 })
}
} \ No newline at end of file
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index a5fd546a..865dd510 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -57,3 +57,7 @@ class CountingSink : Sink {
}
}
}
+
+class NoOpSink : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages
+}
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index db7777c2..01e11665 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.ves.VesEventOuterClass.VesEvent
import java.util.UUID.randomUUID
@@ -42,11 +43,15 @@ private fun ByteBuf.writeValidWireFrameHeaders() {
}
fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
- id: String = randomUUID().toString()): ByteBuf =
+ id: String = randomUUID().toString(),
+ eventFields: ByteString = ByteString.EMPTY): ByteBuf =
+ vesWireFrameMessage(vesEvent(domain, id, eventFields))
+
+fun vesWireFrameMessage(vesEvent: VesEvent) =
allocator.buffer().run {
writeValidWireFrameHeaders()
- val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ val gpb = vesEvent.toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
writeBytes(gpb) // ves event as GPB bytes
}
@@ -70,16 +75,9 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
}
fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
- allocator.buffer().run {
- writeValidWireFrameHeaders()
-
- val gpb = vesEvent(
- domain = domain,
- eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
- ).toByteString().asReadOnlyByteBuffer()
-
- writeInt(gpb.limit()) // ves event size in bytes
- writeBytes(gpb) // ves event as GPB bytes
- }
+ vesWireFrameMessage(
+ domain = domain,
+ eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
+ )