diff options
Diffstat (limited to 'sources')
8 files changed, 199 insertions, 104 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index ef09c063..c7645edf 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -74,6 +74,13 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> @@ -91,30 +98,6 @@ <scope>runtime</scope> </dependency> <dependency> - <groupId>com.nhaarman.mockitokotlin2</groupId> - <artifactId>mockito-kotlin</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.micrometer</groupId> - <artifactId>micrometer-registry-prometheus</artifactId> - </dependency> - <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index 64a7fb3e..2fabf30e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -20,5 +20,6 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics internal interface Metrics { - fun notifyOffsetChanged(size: Long) -} + fun notifyOffsetChanged(offset: Long) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index f137d074..748e43fc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -19,22 +19,40 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics +import io.micrometer.core.instrument.Timer import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - override fun notifyOffsetChanged(size: Long) { - // TODO implementation here - } + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() } + override fun notifyOffsetChanged(offset: Long) { + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + companion object { val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt deleted file mode 100644 index b7ea126f..00000000 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018-2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.kafkaconsumer - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import kotlin.test.assertTrue - -object SampleTest : Spek({ - describe("sample test") { - assertTrue(true) - } -}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt new file mode 100644 index 00000000..41587867 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer +import java.time.Instant +import java.util.concurrent.TimeUnit + +object MicrometerMetricsTest : Spek({ + val PREFIX = "hv-kafka-consumer" + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: PrometheusMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + cut = MicrometerMetrics(registry) + } + + describe("Timers") { + val arbitraryMessageTravelTime = 100L + val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000 + val timerName = "$PREFIX.travel.time" + + on("notifyMessageTravelTime") { + it("should update timer $timerName") { + + val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000 + cut.notifyMessageTravelTime(messageSentTimeMicros) + val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000 + + registry.verifyTimer(timerName) { timer -> + val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble() + val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble() + assertThat(timer.totalTime(TimeUnit.MICROSECONDS)) + .isLessThanOrEqualTo(travelTimeAfterNotify) + .isGreaterThanOrEqualTo(travelTimeBeforeNotify) + + } + } + } + } + + describe("Gauges") { + val gaugeName = "$PREFIX.consumer.offset" + + on("notifyOffsetChanged") { + val offset = 966L + + it("should update $gaugeName") { + cut.notifyOffsetChanged(offset) + + registry.verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + } + } + } + } +}) diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index f260f158..66f3a5fc 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -20,12 +20,10 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option -import arrow.core.Try import io.micrometer.core.instrument.Counter -import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Meter +import io.micrometer.core.instrument.Tags import io.micrometer.core.instrument.Timer -import io.micrometer.core.instrument.search.RequiredSearch import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import org.assertj.core.api.Assertions.assertThat @@ -43,6 +41,9 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame +import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame import org.onap.ves.VesEventOuterClass @@ -65,29 +66,6 @@ object MicrometerMetricsTest : Spek({ cut = MicrometerMetrics(registry) } - fun registrySearch(counterName: String) = RequiredSearch.`in`(registry).name(counterName) - - fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) = - Try { - map(search) - }.fold( - { ex -> assertThat(ex).doesNotThrowAnyException() }, - verifier - ) - - fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) = - verifyMeter(registrySearch(name), RequiredSearch::gauge, verifier) - - fun <T> verifyTimer(name: String, verifier: (Timer) -> T) = - verifyMeter(registrySearch(name), RequiredSearch::timer, verifier) - - fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = - verifyMeter(search, RequiredSearch::counter, verifier) - - fun <T> verifyCounter(name: String, verifier: (Counter) -> T) = - verifyCounter(registrySearch(name), verifier) - - fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) { fun <T : Meter> verifyAllMetersAreUnchangedBut( clazz: KClass<T>, @@ -120,7 +98,7 @@ object MicrometerMetricsTest : Spek({ val bytes = 128 cut.notifyBytesReceived(bytes) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } @@ -139,7 +117,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { cut.notifyMessageReceived(emptyWireProtocolFrame()) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } } @@ -152,7 +130,7 @@ object MicrometerMetricsTest : Spek({ val bytes = 888 cut.notifyMessageReceived(emptyWireProtocolFrame().copy(payloadSize = bytes)) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(bytes.toDouble(), doublePrecision) } } @@ -177,7 +155,7 @@ object MicrometerMetricsTest : Spek({ it("should increment counter") { cut.notifyMessageSent(routedMessage(topicName1)) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut( @@ -196,11 +174,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessage(topicName2)) cut.notifyMessageSent(routedMessage(topicName2)) - verifyCounter(registrySearch(counterName).tag("topic", topicName1)) { + registry.verifyCounter(counterName, Tags.of("topic", topicName1)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("topic", topicName2)) { + registry.verifyCounter(counterName, Tags.of("topic", topicName2)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -214,7 +192,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessageReceivedAt(topicName1, Instant.now().minusMillis(processingTimeMs))) - verifyTimer(counterName) { timer -> + registry.verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) } verifyCountersAndTimersAreUnchangedBut( @@ -233,7 +211,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageSent(routedMessageSentAt(topicName1, Instant.now().minusMillis(latencyMs))) - verifyTimer(counterName) { timer -> + registry.verifyTimer(counterName) { timer -> assertThat(timer.mean(TimeUnit.MILLISECONDS)) .isGreaterThanOrEqualTo(latencyMs.toDouble()) .isLessThanOrEqualTo(latencyMs + 10000.0) @@ -256,7 +234,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageDropped(ROUTE_NOT_FOUND) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.messages.dropped.cause") @@ -271,11 +249,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyMessageDropped(INVALID_MESSAGE) cut.notifyMessageDropped(INVALID_MESSAGE) - verifyCounter(registrySearch(counterName).tag("cause", ROUTE_NOT_FOUND.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", ROUTE_NOT_FOUND.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("cause", INVALID_MESSAGE.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", INVALID_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -290,7 +268,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientConnected() cut.notifyClientConnected() - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName) @@ -307,7 +285,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientDisconnected() cut.notifyClientDisconnected() - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName) @@ -324,7 +302,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientRejected(INVALID_WIRE_FRAME_MARKER) cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) - verifyCounter(counterName) { + registry.verifyCounter(counterName) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } verifyCountersAndTimersAreUnchangedBut(counterName, "$PREFIX.clients.rejected.cause") @@ -338,11 +316,11 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) cut.notifyClientRejected(PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE) - verifyCounter(registrySearch(counterName).tag("cause", INVALID_WIRE_FRAME_MARKER.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", INVALID_WIRE_FRAME_MARKER.tag)) { assertThat(it.count()).isCloseTo(1.0, doublePrecision) } - verifyCounter(registrySearch(counterName).tag("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { + registry.verifyCounter(counterName, Tags.of("cause", PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE.tag)) { assertThat(it.count()).isCloseTo(2.0, doublePrecision) } } @@ -359,7 +337,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientConnected() cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(2.0, doublePrecision) } } @@ -368,7 +346,7 @@ object MicrometerMetricsTest : Spek({ cut.notifyClientDisconnected() cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } @@ -376,7 +354,7 @@ object MicrometerMetricsTest : Spek({ it("should calculate negative difference between connected and disconnected clients") { cut.notifyClientDisconnected() - verifyGauge(gaugeName) { + registry.verifyGauge(gaugeName) { assertThat(it.value()).isCloseTo(0.0, doublePrecision) } } diff --git a/sources/hv-collector-test-utils/pom.xml b/sources/hv-collector-test-utils/pom.xml index bf70e180..97737e84 100644 --- a/sources/hv-collector-test-utils/pom.xml +++ b/sources/hv-collector-test-utils/pom.xml @@ -86,5 +86,10 @@ <artifactId>logback-classic</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt new file mode 100644 index 00000000..1aefdb34 --- /dev/null +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/metrics.kt @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.utils + +import arrow.core.Try +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Tags +import io.micrometer.core.instrument.Timer +import io.micrometer.core.instrument.search.RequiredSearch +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions + + +fun <T> PrometheusMeterRegistry.verifyGauge(name: String, verifier: (Gauge) -> T) = + verifyMeter(findMeter(name), RequiredSearch::gauge, verifier) + +fun <T> PrometheusMeterRegistry.verifyTimer(name: String, verifier: (Timer) -> T) = + verifyMeter(findMeter(name), RequiredSearch::timer, verifier) + +fun <T> PrometheusMeterRegistry.verifyCounter(name: String, verifier: (Counter) -> T) = + verifyCounter(findMeter(name), verifier) + +fun <T> PrometheusMeterRegistry.verifyCounter(name: String, tags: Tags, verifier: (Counter) -> T) = + verifyCounter(findMeter(name).tags(tags), verifier) + +private fun PrometheusMeterRegistry.findMeter(meterName: String) = RequiredSearch.`in`(this).name(meterName) + +private fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) = + verifyMeter(search, RequiredSearch::counter, verifier) + +private inline fun <M, T> verifyMeter(search: RequiredSearch, + map: (RequiredSearch) -> M, + verifier: (M) -> T) = + Try { map(search) }.fold( + { ex -> Assertions.assertThat(ex).doesNotThrowAnyException() }, + verifier + )
\ No newline at end of file |