diff options
14 files changed, 357 insertions, 18 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index ed9f1ad5..a372fb22 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -73,7 +73,6 @@ <version>${project.parent.version}</version> <scope>compile</scope> </dependency> - <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-reflect</artifactId> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 53fd7c3a..e4f02000 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import reactor.core.publisher.Flux interface Sink { - fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> + fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> +} + +interface Metrics { + fun notifyBytesReceived(size: Int) + fun notifyMessageReceived(size: Int) + fun notifyMessageSent(topic: String) } @FunctionalInterface diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 73f4d09d..8785180b 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator @@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference * @since May 2018 */ class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider) { + private val sinkProvider: SinkProvider, + private val metrics: Metrics) { + fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider, private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, - VesDecoder(), - MessageValidator(), - Router(config.routing), - sinkProvider(config)) + wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, + protobufDecoder = VesDecoder(), + validator = MessageValidator(), + router = Router(config.routing), + sink = sinkProvider(config), + metrics = metrics) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 965943f6..222eaefa 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder @@ -40,18 +41,22 @@ internal class VesHvCollector( private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, - private val sink: Sink) : Collector { + private val sink: Sink, + private val metrics: Metrics) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream + .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(wireDecoder::decode) + .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) + .doOnNext { metrics.notifyMessageSent(it.topic) } .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index b943e4e5..a5c41046 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider { private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages .doOnNext(this::logMessage) - .map { it.message } private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 6142fa3c..0a548a52 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult */ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) return sender.send(records) .doOnNext(::logException) @@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM .map { it.correlationMetadata() } } - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> { + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { return SenderRecord.create( msg.topic, msg.partition, System.currentTimeMillis(), msg.message.header, msg.message, - msg.message) + msg) } private fun logException(senderResult: SenderResult<out Any>) { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index c4e9874f..5099ae4c 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.Exceptions @@ -40,7 +41,8 @@ internal class Sut { val configurationProvider = FakeConfigurationProvider() val sink = FakeSink() val alloc = UnpooledByteBufAllocator.DEFAULT - private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) + val metrics = FakeMetrics() + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics) val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt new file mode 100644 index 00000000..cfc44bcd --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class FakeMetrics: Metrics { + override fun notifyBytesReceived(size: Int) { + } + + override fun notifyMessageReceived(size: Int) { + } + + override fun notifyMessageSent(topic: String) { + } + +}
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt index 5d592e42..b0dbd0f5 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -36,7 +36,7 @@ class FakeSink : Sink { val sentMessages: List<RoutedMessage> get() = sent.toList() - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { - return messages.doOnNext(sent::addLast).map { it.message } + override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { + return messages.doOnNext(sent::addLast) } } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index a5a35ba3..dbec1def 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -91,6 +91,14 @@ </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-core</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -109,6 +117,10 @@ <classifier>${os.detected.classifier}</classifier> </dependency> <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-jmx</artifactId> + </dependency> + <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> </dependency> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt new file mode 100644 index 00000000..8a8b6d39 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.syntax.function.memoize +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.jmx.JmxConfig +import io.micrometer.jmx.JmxMeterRegistry +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class MicrometerMetrics( + private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM) +) : Metrics { + + private val receivedBytes = registry.counter("data.received.bytes") + private val receivedMsgCount = registry.counter("messages.received.count") + private val receivedMsgBytes = registry.counter("messages.received.bytes") + private val sentCountTotal = registry.counter("messages.sent.count") + + init { + registry.gauge("messages.processing.count", this) { + (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + } + } + + private val sentCount = { topic: String -> + registry.counter("messages.sent.count", "topic", topic) + }.memoize<String, Counter>() + + + override fun notifyBytesReceived(size: Int) { + receivedBytes.increment(size.toDouble()) + } + + override fun notifyMessageReceived(size: Int) { + receivedMsgCount.increment() + receivedMsgBytes.increment(size.toDouble()) + } + + override fun notifyMessageSent(topic: String) { + sentCountTotal.increment() + sentCount(topic).increment() + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index b7d97028..1f2686ba 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -39,7 +39,8 @@ fun main(args: Array<String>) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.kafkaSink() + AdapterFactory.kafkaSink(), + MicrometerMetrics() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt new file mode 100644 index 00000000..675647c4 --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Try +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.search.RequiredSearch +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +object MicrometerMetricsTest : Spek({ + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: SimpleMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = SimpleMeterRegistry() + cut = MicrometerMetrics(registry) + } + + fun registrySearch() = RequiredSearch.`in`(registry) + + fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = + Try { + map(search) + }.fold( + { ex -> assertThat(ex).doesNotThrowAnyException() }, + verifier + ) + + fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) = + verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + + fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = + verifyMeter(search, RequiredSearch::counter, verifier) + + fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = + verifyCounter(registrySearch().name(name), verifier) + + fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { + registry.meters + .filter { it is Counter } + .filterNot { it.id.name in changedCounters } + .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) } + } + + describe("notifyBytesReceived") { + + on("data.received.bytes counter") { + val counterName = "data.received.bytes" + + it("should increment counter") { + val bytes = 128 + cut.notifyBytesReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + + it("should leave all other counters unchanged") { + cut.notifyBytesReceived(128) + verifyAllCountersAreUnchangedBut(counterName) + } + } + } + + describe("notifyMessageReceived") { + on("messages.received.count counter") { + val counterName = "messages.received.count" + + it("should increment counter") { + cut.notifyMessageReceived(777) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("messages.received.bytes counter") { + val counterName = "messages.received.bytes" + + it("should increment counter") { + val bytes = 888 + cut.notifyMessageReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageReceived(128) + verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes") + } + } + + describe("notifyMessageSent") { + val topicName = "dmaap_topic_name" + val counterName = "messages.sent.count" + + on("$counterName counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("$counterName[topic=$topicName] counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageSent(topicName) + verifyAllCountersAreUnchangedBut(counterName) + } + } + + describe("processing gauge") { + it("should show difference between sent and received messages") { + + on("positive difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageReceived(256) + cut.notifyMessageReceived(256) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(2.0, doublePrecision) + } + } + + on("zero difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + + on("negative difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("calltrace") + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } + +}) @@ -529,6 +529,16 @@ <version>${kotlin.version}</version> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-core</artifactId> + <version>0.7.2</version> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-syntax</artifactId> + <version>0.7.2</version> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.3.0-alpha4</version> @@ -582,6 +592,11 @@ <artifactId>ratpack-core</artifactId> <version>1.5.4</version> </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-jmx</artifactId> + <version>1.0.5</version> + </dependency> <!-- Test dependencies --> |