From 0ba97c7eac5a821c813bfa8ac31b1063956d3824 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 13 Jun 2018 15:45:00 +0200 Subject: Add monitoring support by means of micrometer.io Closes ONAP-345 Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- hv-collector-main/pom.xml | 12 ++ .../collectors/veshv/main/MicrometerMetrics.kt | 67 ++++++++ .../org/onap/dcae/collectors/veshv/main/main.kt | 3 +- .../collectors/veshv/main/MicrometerMetricsTest.kt | 191 +++++++++++++++++++++ 4 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt create mode 100644 hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt (limited to 'hv-collector-main') diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index a5a35ba3..dbec1def 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -90,6 +90,14 @@ ${project.parent.version} + + io.arrow-kt + arrow-core + + + io.arrow-kt + arrow-syntax + org.slf4j slf4j-api @@ -108,6 +116,10 @@ runtime ${os.detected.classifier} + + io.micrometer + micrometer-registry-jmx + org.assertj assertj-core diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt new file mode 100644 index 00000000..8a8b6d39 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.syntax.function.memoize +import io.micrometer.core.instrument.Clock +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.jmx.JmxConfig +import io.micrometer.jmx.JmxMeterRegistry +import org.onap.dcae.collectors.veshv.boundary.Metrics + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class MicrometerMetrics( + private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM) +) : Metrics { + + private val receivedBytes = registry.counter("data.received.bytes") + private val receivedMsgCount = registry.counter("messages.received.count") + private val receivedMsgBytes = registry.counter("messages.received.bytes") + private val sentCountTotal = registry.counter("messages.sent.count") + + init { + registry.gauge("messages.processing.count", this) { + (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + } + } + + private val sentCount = { topic: String -> + registry.counter("messages.sent.count", "topic", topic) + }.memoize() + + + override fun notifyBytesReceived(size: Int) { + receivedBytes.increment(size.toDouble()) + } + + override fun notifyMessageReceived(size: Int) { + receivedMsgCount.increment() + receivedMsgBytes.increment(size.toDouble()) + } + + override fun notifyMessageSent(topic: String) { + sentCountTotal.increment() + sentCount(topic).increment() + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index b7d97028..1f2686ba 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -39,7 +39,8 @@ fun main(args: Array) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.kafkaSink() + AdapterFactory.kafkaSink(), + MicrometerMetrics() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt new file mode 100644 index 00000000..675647c4 --- /dev/null +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -0,0 +1,191 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Try +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.search.RequiredSearch +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +object MicrometerMetricsTest : Spek({ + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: SimpleMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = SimpleMeterRegistry() + cut = MicrometerMetrics(registry) + } + + fun registrySearch() = RequiredSearch.`in`(registry) + + fun verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = + Try { + map(search) + }.fold( + { ex -> assertThat(ex).doesNotThrowAnyException() }, + verifier + ) + + fun verifyGauge(name: String, verifier: (Gauge) -> T) = + verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier) + + fun verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = + verifyMeter(search, RequiredSearch::counter, verifier) + + fun verifyCounter(name: String, verifier: (Counter) -> T) = + verifyCounter(registrySearch().name(name), verifier) + + fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { + registry.meters + .filter { it is Counter } + .filterNot { it.id.name in changedCounters } + .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) } + } + + describe("notifyBytesReceived") { + + on("data.received.bytes counter") { + val counterName = "data.received.bytes" + + it("should increment counter") { + val bytes = 128 + cut.notifyBytesReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + + it("should leave all other counters unchanged") { + cut.notifyBytesReceived(128) + verifyAllCountersAreUnchangedBut(counterName) + } + } + } + + describe("notifyMessageReceived") { + on("messages.received.count counter") { + val counterName = "messages.received.count" + + it("should increment counter") { + cut.notifyMessageReceived(777) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("messages.received.bytes counter") { + val counterName = "messages.received.bytes" + + it("should increment counter") { + val bytes = 888 + cut.notifyMessageReceived(bytes) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageReceived(128) + verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes") + } + } + + describe("notifyMessageSent") { + val topicName = "dmaap_topic_name" + val counterName = "messages.sent.count" + + on("$counterName counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(counterName) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + on("$counterName[topic=$topicName] counter") { + + it("should increment counter") { + cut.notifyMessageSent(topicName) + + verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter -> + assertThat(counter.count()).isCloseTo(1.0, doublePrecision) + } + } + } + + it("should leave all other counters unchanged") { + cut.notifyMessageSent(topicName) + verifyAllCountersAreUnchangedBut(counterName) + } + } + + describe("processing gauge") { + it("should show difference between sent and received messages") { + + on("positive difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageReceived(256) + cut.notifyMessageReceived(256) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(2.0, doublePrecision) + } + } + + on("zero difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + + on("negative difference") { + cut.notifyMessageReceived(128) + cut.notifyMessageSent("calltrace") + cut.notifyMessageSent("hvranmeas") + verifyGauge("messages.processing.count") { gauge -> + assertThat(gauge.value()).isCloseTo(0.0, doublePrecision) + } + } + } + } + +}) -- cgit 1.2.3-korg