diff options
Diffstat (limited to 'sources/hv-collector-main')
7 files changed, 164 insertions, 94 deletions
diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index a94d6346..9e124efd 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -135,7 +135,7 @@ --> <dependency> <groupId>io.micrometer</groupId> - <artifactId>micrometer-registry-jmx</artifactId> + <artifactId>micrometer-registry-prometheus</artifactId> </dependency> </dependencies> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt deleted file mode 100644 index 8a8b6d39..00000000 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.main - -import arrow.syntax.function.memoize -import io.micrometer.core.instrument.Clock -import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.MeterRegistry -import io.micrometer.jmx.JmxConfig -import io.micrometer.jmx.JmxMeterRegistry -import org.onap.dcae.collectors.veshv.boundary.Metrics - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class MicrometerMetrics( - private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM) -) : Metrics { - - private val receivedBytes = registry.counter("data.received.bytes") - private val receivedMsgCount = registry.counter("messages.received.count") - private val receivedMsgBytes = registry.counter("messages.received.bytes") - private val sentCountTotal = registry.counter("messages.sent.count") - - init { - registry.gauge("messages.processing.count", this) { - (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) - } - } - - private val sentCount = { topic: String -> - registry.counter("messages.sent.count", "topic", topic) - }.memoize<String, Counter>() - - - override fun notifyBytesReceived(size: Int) { - receivedBytes.increment(size.toDouble()) - } - - override fun notifyMessageReceived(size: Int) { - receivedMsgCount.increment() - receivedMsgBytes.increment(size.toDouble()) - } - - override fun notifyMessageSent(topic: String) { - sentCountTotal.increment() - sentCount(topic).increment() - } -} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt new file mode 100644 index 00000000..cf903591 --- /dev/null +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -0,0 +1,90 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.metrics + +import arrow.syntax.function.memoize +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics +import io.micrometer.core.instrument.binder.system.ProcessorMetrics +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.boundary.Metrics + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class MicrometerMetrics internal constructor( + private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) +) : Metrics { + + private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) + private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT)) + private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES)) + private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT)) + + init { + registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) { + (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0) + } + ClassLoaderMetrics().bindTo(registry) + JvmMemoryMetrics().bindTo(registry) + JvmGcMetrics().bindTo(registry) + ProcessorMetrics().bindTo(registry) + JvmThreadMetrics().bindTo(registry) + } + + private val sentCount = { topic: String -> + registry.counter("hvves.messages.sent.topic.count", "topic", topic) + }.memoize<String, Counter>() + + val metricsProvider = MicrometerPrometheusMetricsProvider(registry) + + override fun notifyBytesReceived(size: Int) { + receivedBytes.increment(size.toDouble()) + } + + override fun notifyMessageReceived(size: Int) { + receivedMsgCount.increment() + receivedMsgBytes.increment(size.toDouble()) + } + + override fun notifyMessageSent(topic: String) { + sentCountTotal.increment() + sentCount(topic).increment() + } + + companion object { + val INSTANCE = MicrometerMetrics() + internal const val PREFIX = "hvves" + internal const val MESSAGES = "messages" + internal const val RECEIVED = "received" + internal const val BYTES = "bytes" + internal const val COUNT = "count" + internal const val DATA = "data" + internal const val SENT = "sent" + internal const val PROCESSING = "processing" + fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" + } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerPrometheusMetricsProvider.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerPrometheusMetricsProvider.kt new file mode 100644 index 00000000..2af8e38f --- /dev/null +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerPrometheusMetricsProvider.kt @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.metrics + +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider +import reactor.core.publisher.Mono + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since December 2018 + */ +class MicrometerPrometheusMetricsProvider(private val registry: PrometheusMeterRegistry) : PrometheusMetricsProvider { + override fun lastStatus(): Mono<String> = Mono.fromCallable { + registry.scrape() + } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index ae59da69..00123f1a 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.main.servers import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -34,6 +35,7 @@ object HealthCheckServer : ServerStarter() { private fun createHealthCheckServer(config: ServerConfiguration) = HealthCheckApiServer( HealthState.INSTANCE, + MicrometerMetrics.INSTANCE.metricsProvider, config.healthCheckApiListenAddress) override fun serverStartedMessage(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index d788c164..b35dc53d 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory -import org.onap.dcae.collectors.veshv.main.MicrometerMetrics +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.ServerHandle @@ -40,7 +40,7 @@ object VesServer : ServerStarter() { val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), sink, - MicrometerMetrics(), + MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index a379933e..66326ddc 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -24,12 +24,17 @@ import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry import org.assertj.core.api.Assertions.assertThat import org.assertj.core.data.Percentage import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -37,11 +42,11 @@ import org.jetbrains.spek.api.dsl.on */ object MicrometerMetricsTest : Spek({ val doublePrecision = Percentage.withPercentage(0.5) - lateinit var registry: SimpleMeterRegistry + lateinit var registry: PrometheusMeterRegistry lateinit var cut: MicrometerMetrics beforeEachTest { - registry = SimpleMeterRegistry() + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) cut = MicrometerMetrics(registry) } @@ -67,14 +72,17 @@ object MicrometerMetricsTest : Spek({ fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) { registry.meters .filter { it is Counter } + .map { it as Counter } .filterNot { it.id.name in changedCounters } - .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) } + .forEach { + assertThat(it.count()).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision) + } } describe("notifyBytesReceived") { - on("data.received.bytes counter") { - val counterName = "data.received.bytes" + on("$PREFIX.data.received.bytes counter") { + val counterName = "$PREFIX.data.received.bytes" it("should increment counter") { val bytes = 128 @@ -93,8 +101,8 @@ object MicrometerMetricsTest : Spek({ } describe("notifyMessageReceived") { - on("messages.received.count counter") { - val counterName = "messages.received.count" + on("$PREFIX.messages.received.count counter") { + val counterName = "$PREFIX.messages.received.count" it("should increment counter") { cut.notifyMessageReceived(777) @@ -105,8 +113,8 @@ object MicrometerMetricsTest : Spek({ } } - on("messages.received.bytes counter") { - val counterName = "messages.received.bytes" + on("$PREFIX.messages.received.bytes counter") { + val counterName = "$PREFIX.messages.received.bytes" it("should increment counter") { val bytes = 888 @@ -120,39 +128,42 @@ object MicrometerMetricsTest : Spek({ it("should leave all other counters unchanged") { cut.notifyMessageReceived(128) - verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes") + verifyAllCountersAreUnchangedBut("$PREFIX.messages.received.count", "$PREFIX.messages.received.bytes") } } describe("notifyMessageSent") { - val topicName = "dmaap_topic_name" - val counterName = "messages.sent.count" + val topicName1 = "PERF3GPP" + val topicName2 = "CALLTRACE" - on("$counterName counter") { + on("$PREFIX.messages.sent.count counter") { + val counterName = "$PREFIX.messages.sent.count" it("should increment counter") { - cut.notifyMessageSent(topicName) + cut.notifyMessageSent(topicName1) verifyCounter(counterName) { counter -> assertThat(counter.count()).isCloseTo(1.0, doublePrecision) } + verifyAllCountersAreUnchangedBut(counterName, "$PREFIX.messages.sent.topic.count") } } - on("$counterName[topic=$topicName] counter") { - - it("should increment counter") { - cut.notifyMessageSent(topicName) + on("$PREFIX.messages.sent.topic.count counter") { + val counterName = "$PREFIX.messages.sent.topic.count" + it("should handle counters for different topics") { + cut.notifyMessageSent(topicName1) + cut.notifyMessageSent(topicName2) + cut.notifyMessageSent(topicName2) - verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter -> + verifyCounter(registrySearch().name(counterName).tag("topic", topicName1)) { counter -> assertThat(counter.count()).isCloseTo(1.0, doublePrecision) } - } - } - it("should leave all other counters unchanged") { - cut.notifyMessageSent(topicName) - verifyAllCountersAreUnchangedBut(counterName) + verifyCounter(registrySearch().name(counterName).tag("topic", topicName2)) { counter -> + assertThat(counter.count()).isCloseTo(2.0, doublePrecision) + } + } } } |