diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-13 15:45:00 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 09:11:16 +0200 |
commit | 0ba97c7eac5a821c813bfa8ac31b1063956d3824 (patch) | |
tree | 9b937c8ec7b3533f8fd63531170da3ac2fda7fbb /hv-collector-core | |
parent | 85a59b8d29c6f81720fe3d2e59926740173fcae9 (diff) |
Add monitoring support by means of micrometer.io
Closes ONAP-345
Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core')
6 files changed, 27 insertions, 14 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index ed9f1ad5..a372fb22 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -73,7 +73,6 @@ <version>${project.parent.version}</version> <scope>compile</scope> </dependency> - <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-reflect</artifactId> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 53fd7c3a..e4f02000 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> + fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> +} + +interface Metrics { + fun notifyBytesReceived(size: Int) + fun notifyMessageReceived(size: Int) + fun notifyMessageSent(topic: String) } @FunctionalInterface diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 73f4d09d..8785180b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator @@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference * @since May 2018 */ class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider) { + private val sinkProvider: SinkProvider, + private val metrics: Metrics) { + fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, - VesDecoder(), - MessageValidator(), - Router(config.routing), - sinkProvider(config)) + wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, + protobufDecoder = VesDecoder(), + validator = MessageValidator(), + router = Router(config.routing), + sink = sinkProvider(config), + metrics = metrics) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 965943f6..222eaefa 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -40,18 +41,22 @@ internal class VesHvCollector( private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, - private val sink: Sink) : Collector { + private val sink: Sink, + private val metrics: Metrics) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index b943e4e5..a5c41046 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages .doOnNext(this::logMessage) - .map { it.message } private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 6142fa3c..0a548a52 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult */ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) return sender.send(records) .doOnNext(::logException) @@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM .map { it.correlationMetadata() } } - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> { + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { return SenderRecord.create( msg.topic, msg.partition, System.currentTimeMillis(), msg.message.header, msg.message, - msg.message) + msg) } private fun logException(senderResult: SenderResult<out Any>) { |