diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-14 12:05:47 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-12-17 15:06:29 +0100 |
commit | d55f5c0c3df4b2ea136100e61424810ede749778 (patch) | |
tree | b4d60ede755af2a2204b8303d75b4d74489f6802 /sources/hv-collector-core/src | |
parent | fb040c0df8ab2b74d02b67feda4e2a161a1311d2 (diff) |
Metric: Processing time
Add processing time metric measured as difference between "sent to DMaaP" and "WTP decoded" events.
Change-Id: I73bb665145019fcca5ae36e2199ed0e1cc088fdf
Issue-ID: DCAEGEN2-1036
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src')
9 files changed, 46 insertions, 42 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 3f69c088..1334738a 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.MessageDropCause @@ -31,8 +32,8 @@ interface Sink { interface Metrics { fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) + fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index c670e1d8..ee499e19 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.Try import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventOuterClass.VesEvent @@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent */ internal class VesDecoder { - fun decode(bytes: ByteData): Try<VesMessage> = + fun decode(frame: WireFrameMessage): Try<VesMessage> = Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) + val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader + VesMessage(decodedHeader, frame) } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index b29432f0..51f894d3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -68,7 +67,7 @@ internal class VesHvCollector( private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireChunkDecoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } + .doOnNext(metrics::notifyMessageReceived) private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux .filterFailedWithLog { @@ -78,15 +77,14 @@ internal class VesHvCollector( } private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux - .map(WireFrameMessage::payload) - .flatMap(::decodePayload) - - private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder - .decode(rawPayload) - .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } - .filterFailedWithLog(logger, clientContext::fullMdc, - { "Ves event header decoded successfully" }, - { "Failed to decode ves event header, reason: ${it.message}" }) + .flatMap { frame -> + protobufDecoder + .decode(frame) + .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) } + .filterFailedWithLog(logger, clientContext::fullMdc, + { "Ves event header decoded successfully" }, + { "Failed to decode ves event header, reason: ${it.message}" }) + } private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux .filterFailedWithLog { @@ -98,7 +96,7 @@ internal class VesHvCollector( private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux .flatMap(this::findRoute) .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } + .doOnNext(metrics::notifyMessageSent) private fun findRoute(msg: VesMessage) = router .findDestination(msg) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index ec8593af..14966d9b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) + val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(ctx, logMessageSupplier) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt index 7a6ac7c8..c92518a5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage */ class VesMessageSerializer : Serializer<VesMessage> { override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + // not needed } - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray() override fun close() { + // not needed } } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 1965d78c..d3640193 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) +data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index f784daa4..7d136ef1 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -29,7 +29,8 @@ import org.jetbrains.spek.api.dsl.* import org.onap.dcae.collectors.veshv.domain.* import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.ves.VesEventOuterClass.CommonEventHeader.* import kotlin.test.assertTrue import kotlin.test.fail @@ -43,7 +44,7 @@ internal object MessageValidatorTest : Spek({ val commonHeader = commonHeader() it("should accept message with fully initialized message header") { - val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) + val vesMessage = VesMessage(commonHeader, wireProtocolFrame(commonHeader)) with(cut) { assertThat(validateProtobufMessage(vesMessage).isRight()) .describedAs("message validation result").isTrue() @@ -53,7 +54,7 @@ internal object MessageValidatorTest : Spek({ VesEventDomain.values().forEach { domain -> it("should accept message with $domain domain") { val header = commonHeader(domain) - val vesMessage = VesMessage(header, vesEventBytes(header)) + val vesMessage = VesMessage(header, wireProtocolFrame(header)) with(cut) { assertThat(validateProtobufMessage(vesMessage).isRight()) .describedAs("message validation result").isTrue() @@ -63,7 +64,7 @@ internal object MessageValidatorTest : Spek({ } on("ves hv message bytes") { - val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) + val vesMessage = VesMessage(getDefaultInstance(), emptyWireProtocolFrame()) it("should not accept message with default header") { with(cut) { @@ -100,7 +101,7 @@ internal object MessageValidatorTest : Spek({ ).forEach { value, expectedResult -> on("ves hv message including header with priority $value") { val commonEventHeader = commonHeader(priority = value) - val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) + val vesMessage = VesMessage(commonEventHeader, wireProtocolFrame(commonEventHeader)) it("should resolve validation result") { with(cut) { @@ -121,7 +122,7 @@ internal object MessageValidatorTest : Spek({ .setEventId("Sample event Id") .setSourceName("Sample Source") .build() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept not fully initialized message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -148,7 +149,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with non-string major part") { val commonHeader = commonHeader(vesEventListenerVersion = "sample-version") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { @@ -169,7 +170,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with major part != 7") { val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -190,7 +191,7 @@ internal object MessageValidatorTest : Spek({ on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") { val commonHeader = commonHeader(vesEventListenerVersion = "7.test") - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) it("should not accept message header") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) @@ -237,7 +238,7 @@ internal object MessageValidatorTest : Spek({ with(cut) { on("valid message as input") { val commonHeader = commonHeader() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) val vesMessage = VesMessage(commonHeader, rawMessageBytes) it("should be right") { @@ -247,7 +248,7 @@ internal object MessageValidatorTest : Spek({ } on("invalid message as input") { val commonHeader = newBuilder().build() - val rawMessageBytes = vesEventBytes(commonHeader) + val rawMessageBytes = wireProtocolFrame(commonHeader) val vesMessage = VesMessage(commonHeader, rawMessageBytes) it("should be left") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index e4190163..90b850c0 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -21,13 +21,11 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None import arrow.core.Some -import io.netty.buffer.ByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG @@ -36,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame /** @@ -61,7 +60,7 @@ object RouterTest : Spek({ val cut = Router(config, ClientContext()) on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) + val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should have route available") { @@ -82,7 +81,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) + val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should have route available") { @@ -103,7 +102,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) + val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) val result = cut.findDestination(message) it("should not have route available") { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 605e7a6e..74f33a78 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -42,16 +43,16 @@ internal object VesDecoderTest : Spek({ on("ves hv message bytes") { val commonHeader = commonHeader(HEARTBEAT) - val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) + val wtpFrame = wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) it("should decode only header and pass it on along with raw message") { val expectedMessage = VesMessage( commonHeader, - rawMessageBytes + wtpFrame ) assertTrue { - cut.decode(rawMessageBytes).exists { + cut.decode(wtpFrame).exists { it == expectedMessage } } @@ -60,9 +61,10 @@ internal object VesDecoderTest : Spek({ on("invalid ves hv message bytes") { val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) + val wtpFrame = emptyWireProtocolFrame().copy(payload = rawMessageBytes, payloadSize = rawMessageBytes.size()) it("should throw error") { - assertFailedWithError(cut.decode(rawMessageBytes)) + assertFailedWithError(cut.decode(wtpFrame)) } } } |