summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-14 12:05:47 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-12-17 15:06:29 +0100
commitd55f5c0c3df4b2ea136100e61424810ede749778 (patch)
treeb4d60ede755af2a2204b8303d75b4d74489f6802 /sources
parentfb040c0df8ab2b74d02b67feda4e2a161a1311d2 (diff)
Metric: Processing time
Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events. Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf Issue-ID: DCAEGEN2-1036 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt23
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt9
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt12
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt22
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt18
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt10
-rw-r--r--sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt5
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt76
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt37
17 files changed, 203 insertions, 74 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 3f69c088..1334738a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.MessageDropCause
@@ -31,8 +32,8 @@ interface Sink {
interface Metrics {
fun notifyBytesReceived(size: Int)
- fun notifyMessageReceived(size: Int)
- fun notifyMessageSent(topic: String)
+ fun notifyMessageReceived(msg: WireFrameMessage)
+ fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index c670e1d8..ee499e19 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Try
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.VesEvent
@@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): Try<VesMessage> =
+ fun decode(frame: WireFrameMessage): Try<VesMessage> =
Try {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- VesMessage(decodedHeader, bytes)
+ val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader
+ VesMessage(decodedHeader, frame)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index b29432f0..51f894d3 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -68,7 +67,7 @@ internal class VesHvCollector(
private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireChunkDecoder::decode)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+ .doOnNext(metrics::notifyMessageReceived)
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
.filterFailedWithLog {
@@ -78,15 +77,14 @@ internal class VesHvCollector(
}
private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
- .map(WireFrameMessage::payload)
- .flatMap(::decodePayload)
-
- private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
- .decode(rawPayload)
- .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
- .filterFailedWithLog(logger, clientContext::fullMdc,
- { "Ves event header decoded successfully" },
- { "Failed to decode ves event header, reason: ${it.message}" })
+ .flatMap { frame ->
+ protobufDecoder
+ .decode(frame)
+ .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ .filterFailedWithLog(logger, clientContext::fullMdc,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+ }
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
.filterFailedWithLog {
@@ -98,7 +96,7 @@ internal class VesHvCollector(
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
+ .doOnNext(metrics::notifyMessageSent)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index ec8593af..14966d9b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+ val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
logger.info(ctx, logMessageSupplier)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
index 7a6ac7c8..c92518a5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
*/
class VesMessageSerializer : Serializer<VesMessage> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // not needed
}
- override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray()
override fun close() {
+ // not needed
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 1965d78c..d3640193 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.model
-import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
+data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index f784daa4..7d136ef1 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -29,7 +29,8 @@ import org.jetbrains.spek.api.dsl.*
import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -43,7 +44,7 @@ internal object MessageValidatorTest : Spek({
val commonHeader = commonHeader()
it("should accept message with fully initialized message header") {
- val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
+ val vesMessage = VesMessage(commonHeader, wireProtocolFrame(commonHeader))
with(cut) {
assertThat(validateProtobufMessage(vesMessage).isRight())
.describedAs("message validation result").isTrue()
@@ -53,7 +54,7 @@ internal object MessageValidatorTest : Spek({
VesEventDomain.values().forEach { domain ->
it("should accept message with $domain domain") {
val header = commonHeader(domain)
- val vesMessage = VesMessage(header, vesEventBytes(header))
+ val vesMessage = VesMessage(header, wireProtocolFrame(header))
with(cut) {
assertThat(validateProtobufMessage(vesMessage).isRight())
.describedAs("message validation result").isTrue()
@@ -63,7 +64,7 @@ internal object MessageValidatorTest : Spek({
}
on("ves hv message bytes") {
- val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
+ val vesMessage = VesMessage(getDefaultInstance(), emptyWireProtocolFrame())
it("should not accept message with default header") {
with(cut) {
@@ -100,7 +101,7 @@ internal object MessageValidatorTest : Spek({
).forEach { value, expectedResult ->
on("ves hv message including header with priority $value") {
val commonEventHeader = commonHeader(priority = value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
+ val vesMessage = VesMessage(commonEventHeader, wireProtocolFrame(commonEventHeader))
it("should resolve validation result") {
with(cut) {
@@ -121,7 +122,7 @@ internal object MessageValidatorTest : Spek({
.setEventId("Sample event Id")
.setSourceName("Sample Source")
.build()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept not fully initialized message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -148,7 +149,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with non-string major part") {
val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
@@ -169,7 +170,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with major part != 7") {
val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -190,7 +191,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -237,7 +238,7 @@ internal object MessageValidatorTest : Spek({
with(cut) {
on("valid message as input") {
val commonHeader = commonHeader()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
it("should be right") {
@@ -247,7 +248,7 @@ internal object MessageValidatorTest : Spek({
}
on("invalid message as input") {
val commonHeader = newBuilder().build()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
it("should be left") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index e4190163..90b850c0 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -21,13 +21,11 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
import arrow.core.Some
-import io.netty.buffer.ByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
@@ -36,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
/**
@@ -61,7 +60,7 @@ object RouterTest : Spek({
val cut = Router(config, ClientContext())
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should have route available") {
@@ -82,7 +81,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should have route available") {
@@ -103,7 +102,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 605e7a6e..74f33a78 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -42,16 +43,16 @@ internal object VesDecoderTest : Spek({
on("ves hv message bytes") {
val commonHeader = commonHeader(HEARTBEAT)
- val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
+ val wtpFrame = wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
val expectedMessage = VesMessage(
commonHeader,
- rawMessageBytes
+ wtpFrame
)
assertTrue {
- cut.decode(rawMessageBytes).exists {
+ cut.decode(wtpFrame).exists {
it == expectedMessage
}
}
@@ -60,9 +61,10 @@ internal object VesDecoderTest : Spek({
on("invalid ves hv message bytes") {
val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
+ val wtpFrame = emptyWireProtocolFrame().copy(payload = rawMessageBytes, payloadSize = rawMessageBytes.size())
it("should throw error") {
- assertFailedWithError(cut.decode(rawMessageBytes))
+ assertFailedWithError(cut.decode(wtpFrame))
}
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index dd8acf77..9f5c37e1 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -33,7 +33,12 @@ import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TO
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.*
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
+import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import java.time.Duration
object MetricsSpecification : Spek({
debugRx(false)
@@ -102,6 +107,21 @@ object MetricsSpecification : Spek({
}
}
+ describe("Processing time") {
+ it("should gather processing time metric") {
+ val delay = Duration.ofMillis(10)
+ val sut = vesHvWithDelayingSink(delay)
+
+ sut.handleConnection(vesWireFrameMessage(PERF3GPP))
+
+
+ val metrics = sut.metrics
+ assertThat(metrics.lastProcessingTimeMicros)
+ .describedAs("processingTime metric")
+ .isGreaterThanOrEqualTo(delay.toNanos().toDouble() / 1000.0)
+ }
+ }
+
describe("Messages dropped metrics") {
it("should gather metrics for invalid messages") {
val sut = vesHvWithNoOpSink(basicConfiguration)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 0c1b589b..7ebbfba0 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -77,3 +77,8 @@ fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConf
Sut(NoOpSink()).apply {
configurationProvider.updateConfiguration(collectorConfiguration)
}
+
+fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+ Sut(ProcessingSink { it.delayElements(delay) }).apply {
+ configurationProvider.updateConfiguration(collectorConfiguration)
+ }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index 9ddb7115..660ce498 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -20,7 +20,11 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import kotlin.test.fail
@@ -31,6 +35,7 @@ import kotlin.test.fail
class FakeMetrics : Metrics {
var bytesReceived: Int = 0
var messageBytesReceived: Int = 0
+ var lastProcessingTimeMicros: Double = -1.0
var messagesSentCount: Int = 0
var messagesDroppedCount: Int = 0
@@ -41,13 +46,16 @@ class FakeMetrics : Metrics {
bytesReceived += size
}
- override fun notifyMessageReceived(size: Int) {
- messageBytesReceived += size
+ override fun notifyMessageReceived(msg: WireFrameMessage) {
+ messageBytesReceived += msg.payloadSize
}
- override fun notifyMessageSent(topic: String) {
+ override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
- messagesSentToTopic.compute(topic) { k, _ -> messagesSentToTopic[k]?.inc() ?: 1 }
+ messagesSentToTopic.compute(msg.topic) { k, _ ->
+ messagesSentToTopic[k]?.inc() ?: 1
+ }
+ lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -61,4 +69,4 @@ class FakeMetrics : Metrics {
fun messagesDropped(cause: MessageDropCause) =
messagesDroppedCause[cause]
?: fail("No messages were dropped due to cause: ${cause.name}")
-} \ No newline at end of file
+}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 865dd510..2f731f53 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -19,12 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.core.identity
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
+import java.util.function.Function
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -58,6 +61,9 @@ class CountingSink : Sink {
}
}
-class NoOpSink : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages
+
+open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
}
+
+class NoOpSink : ProcessingSink(::identity)
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 1257c6bb..d1fdb10c 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -22,6 +22,8 @@ package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
import arrow.core.Either.Companion.left
import arrow.core.Either.Companion.right
+import java.time.Instant
+import java.time.temporal.Temporal
/**
@@ -58,7 +60,8 @@ data class WireFrameMessage(val payload: ByteData,
val versionMajor: Short,
val versionMinor: Short,
val payloadType: Int,
- val payloadSize: Int
+ val payloadSize: Int,
+ val receivedAt: Temporal = Instant.now()
) {
constructor(payload: ByteArray) : this(
ByteData(payload),
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 18678ff3..259fa037 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -29,7 +29,11 @@ import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import java.time.Duration
+import java.time.Instant
/**
@@ -53,6 +57,7 @@ class MicrometerMetrics internal constructor(
private val droppedCount = { cause: String ->
registry.counter(name(MESSAGES, DROPPED, COUNT, CAUSE), CAUSE, cause)
}.memoize<String, Counter>()
+ private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
@@ -71,14 +76,15 @@ class MicrometerMetrics internal constructor(
receivedBytes.increment(size.toDouble())
}
- override fun notifyMessageReceived(size: Int) {
+ override fun notifyMessageReceived(msg: WireFrameMessage) {
receivedMsgCount.increment()
- receivedMsgBytes.increment(size.toDouble())
+ receivedMsgBytes.increment(msg.payloadSize.toDouble())
}
- override fun notifyMessageSent(topic: String) {
+ override fun notifyMessageSent(msg: RoutedMessage) {
sentCountTotal.increment()
- sentCount(topic).increment()
+ sentCount(msg.topic).increment()
+ processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()))
}
override fun notifyMessageDropped(cause: MessageDropCause) {
@@ -100,7 +106,7 @@ class MicrometerMetrics internal constructor(
internal const val DROPPED = "dropped"
internal const val CAUSE = "cause"
internal const val TOTAL = "total"
-
+ internal const val TIME = "time"
fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e2dc2f82..cb5cfc70 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.main
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
@@ -35,6 +36,15 @@ import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
+import java.time.Instant
+import java.time.temporal.Temporal
+import java.util.concurrent.TimeUnit
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -63,6 +73,9 @@ object MicrometerMetricsTest : Spek({
fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+ fun <T> verifyTimer(name: String, verifier: (Timer) -> T) =
+ verifyMeter(registrySearch().name(name), RequiredSearch::timer, verifier)
+
fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
verifyMeter(search, RequiredSearch::counter, verifier)
@@ -71,6 +84,7 @@ object MicrometerMetricsTest : Spek({
fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
registry.meters
+ .filter { it.id.name.startsWith(PREFIX) }
.filter { it is Counter }
.map { it as Counter }
.filterNot { it.id.name in changedCounters }
@@ -105,7 +119,7 @@ object MicrometerMetricsTest : Spek({
val counterName = "$PREFIX.messages.received.count"
it("should increment counter") {
- cut.notifyMessageReceived(777)
+ cut.notifyMessageReceived(emptyWireProtocolFrame())
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -118,7 +132,7 @@ object MicrometerMetricsTest : Spek({
it("should increment counter") {
val bytes = 888
- cut.notifyMessageReceived(bytes)
+ cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes))
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision)
@@ -127,7 +141,7 @@ object MicrometerMetricsTest : Spek({
}
it("should leave all other counters unchanged") {
- cut.notifyMessageReceived(128)
+ cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = 128))
verifyAllCountersAreUnchangedBut(
"$PREFIX.messages.received.count",
"$PREFIX.messages.received.bytes"
@@ -143,7 +157,7 @@ object MicrometerMetricsTest : Spek({
val counterName = "$PREFIX.messages.sent.count.total"
it("should increment counter") {
- cut.notifyMessageSent(topicName1)
+ cut.notifyMessageSent(routedMessage(topicName1))
verifyCounter(counterName) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -155,9 +169,9 @@ object MicrometerMetricsTest : Spek({
on("$PREFIX.messages.sent.topic.count counter") {
val counterName = "$PREFIX.messages.sent.count.topic"
it("should handle counters for different topics") {
- cut.notifyMessageSent(topicName1)
- cut.notifyMessageSent(topicName2)
- cut.notifyMessageSent(topicName2)
+ cut.notifyMessageSent(routedMessage(topicName1))
+ cut.notifyMessageSent(routedMessage(topicName2))
+ cut.notifyMessageSent(routedMessage(topicName2))
verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) {
assertThat(it.count()).isCloseTo(1.0, doublePrecision)
@@ -168,6 +182,24 @@ object MicrometerMetricsTest : Spek({
}
}
}
+
+ on("$PREFIX.messages.processing.time") {
+ val counterName = "$PREFIX.messages.processing.time"
+ val processingTimeMs = 100L
+
+ it("should update timer") {
+
+ cut.notifyMessageSent(routedMessage(topicName1, Instant.now().minusMillis(processingTimeMs)))
+
+ verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+ }
+ verifyAllCountersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.count.topic",
+ "$PREFIX.messages.sent.count.total")
+ }
+ }
}
describe("notifyMessageDropped") {
@@ -207,27 +239,27 @@ object MicrometerMetricsTest : Spek({
it("should show difference between sent and received messages") {
on("positive difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageReceived(256)
- cut.notifyMessageReceived(256)
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(2.0, doublePrecision)
}
}
on("zero difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(emptyWireProtocolFrame())
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
}
on("negative difference") {
- cut.notifyMessageReceived(128)
- cut.notifyMessageSent("fault")
- cut.notifyMessageSent("perf3gpp")
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageSent(routedMessage("fault"))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
verifyGauge("messages.processing.count") {
assertThat(it.value()).isCloseTo(0.0, doublePrecision)
}
@@ -236,3 +268,15 @@ object MicrometerMetricsTest : Spek({
}
})
+
+fun routedMessage(topic: String, partition: Int = 0) =
+ vesEvent().let {evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
+ }
+
+fun routedMessage(topic: String, receivedAt: Temporal, partition: Int = 0) =
+ vesEvent().let {evt ->
+ RoutedMessage(topic, partition,
+ VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
+ } \ No newline at end of file
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index cf30d2ce..a8456890 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -23,8 +23,10 @@ package org.onap.dcae.collectors.veshv.tests.utils
import com.google.protobuf.ByteString
import com.google.protobuf.MessageLite
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadContentType
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
@@ -72,7 +74,38 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
.setVesEventListenerVersion(vesEventListenerVersion)
.build()
-fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
- vesEvent(commonHeader, byteString).toByteData()
+fun emptyWireProtocolFrame(): WireFrameMessage = wireProtocolFrameWithPayloadSize(0)
+
+
+fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMessage(
+ payload = ByteData(ByteArray(size)),
+ versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+ versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
+ payloadSize = size,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+)
+
+fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage =
+ vesEventBytes(commonHeader, eventFields).let { payload ->
+ WireFrameMessage(
+ payload = payload,
+ versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+ versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
+ payloadSize = payload.size(),
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+ )
+ }
+
+fun wireProtocolFrame(evt: VesEventOuterClass.VesEvent) =
+ WireFrameMessage(
+ payload = ByteData(evt.toByteArray()),
+ payloadSize = evt.serializedSize,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
+ versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR
+ )
+
+fun vesEventBytes(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): ByteData =
+ vesEvent(commonHeader, eventFields).toByteData()
fun MessageLite.toByteData(): ByteData = ByteData(toByteArray()) \ No newline at end of file