From 0ba97c7eac5a821c813bfa8ac31b1063956d3824 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 13 Jun 2018 15:45:00 +0200 Subject: Add monitoring support by means of micrometer.io Closes ONAP-345 Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../org/onap/dcae/collectors/veshv/boundary/adapters.kt | 8 +++++++- .../dcae/collectors/veshv/factory/CollectorFactory.kt | 16 ++++++++++------ .../onap/dcae/collectors/veshv/impl/VesHvCollector.kt | 7 ++++++- .../veshv/impl/adapters/LoggingSinkProvider.kt | 3 +-- .../collectors/veshv/impl/adapters/kafka/KafkaSink.kt | 6 +++--- 5 files changed, 27 insertions(+), 13 deletions(-) (limited to 'hv-collector-core/src/main/kotlin') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 53fd7c3a..e4f02000 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux): Flux + fun send(messages: Flux): Flux +} + +interface Metrics { + fun notifyBytesReceived(size: Int) + fun notifyMessageReceived(size: Int) + fun notifyMessageSent(topic: String) } @FunctionalInterface diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 73f4d09d..8785180b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator @@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference * @since May 2018 */ class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider) { + private val sinkProvider: SinkProvider, + private val metrics: Metrics) { + fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, - VesDecoder(), - MessageValidator(), - Router(config.routing), - sinkProvider(config)) + wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, + protobufDecoder = VesDecoder(), + validator = MessageValidator(), + router = Router(config.routing), + sink = sinkProvider(config), + metrics = metrics) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 965943f6..222eaefa 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -40,18 +41,22 @@ internal class VesHvCollector( private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, - private val sink: Sink) : Collector { + private val sink: Sink, + private val metrics: Metrics) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index b943e4e5..a5c41046 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux): Flux = + override fun send(messages: Flux): Flux = messages .doOnNext(this::logMessage) - .map { it.message } private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 6142fa3c..0a548a52 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult */ internal class KafkaSink(private val sender: KafkaSender) : Sink { - override fun send(messages: Flux): Flux { + override fun send(messages: Flux): Flux { val records = messages.map(this::vesToKafkaRecord) return sender.send(records) .doOnNext(::logException) @@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender { + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { return SenderRecord.create( msg.topic, msg.partition, System.currentTimeMillis(), msg.message.header, msg.message, - msg.message) + msg) } private fun logException(senderResult: SenderResult) { -- cgit 1.2.3-korg