aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/pom.xml1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt16
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt7
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt6
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt38
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt4
-rw-r--r--hv-collector-main/pom.xml12
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt67
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt3
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt191
-rw-r--r--pom.xml15
14 files changed, 357 insertions, 18 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index ed9f1ad5..a372fb22 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -73,7 +73,6 @@
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
-
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 53fd7c3a..e4f02000 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -25,7 +25,13 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import reactor.core.publisher.Flux
interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<VesMessage>
+ fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+}
+
+interface Metrics {
+ fun notifyBytesReceived(size: Int)
+ fun notifyMessageReceived(size: Int)
+ fun notifyMessageSent(topic: String)
}
@FunctionalInterface
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 73f4d09d..8785180b 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.MessageValidator
@@ -38,7 +39,9 @@ import java.util.concurrent.atomic.AtomicReference
* @since May 2018
*/
class CollectorFactory(val configuration: ConfigurationProvider,
- private val sinkProvider: SinkProvider) {
+ private val sinkProvider: SinkProvider,
+ private val metrics: Metrics) {
+
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
createVesHvCollector().subscribe(collector::set)
@@ -50,11 +53,12 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
- VesDecoder(),
- MessageValidator(),
- Router(config.routing),
- sinkProvider(config))
+ wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+ protobufDecoder = VesDecoder(),
+ validator = MessageValidator(),
+ router = Router(config.routing),
+ sink = sinkProvider(config),
+ metrics = metrics)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 965943f6..222eaefa 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -40,18 +41,22 @@ internal class VesHvCollector(
private val protobufDecoder: VesDecoder,
private val validator: MessageValidator,
private val router: Router,
- private val sink: Sink) : Collector {
+ private val sink: Sink,
+ private val metrics: Metrics) : Collector {
override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
+ .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireDecoder::decode)
+ .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
.filter(WireFrame::isValid)
.map(WireFrame::payload)
.map(protobufDecoder::decode)
.filter(validator::isValid)
.flatMap(this::findRoute)
.compose(sink::send)
+ .doOnNext { metrics.notifyMessageSent(it.topic) }
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
.then()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index b943e4e5..a5c41046 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -40,10 +40,9 @@ internal class LoggingSinkProvider : SinkProvider {
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
messages
.doOnNext(this::logMessage)
- .map { it.message }
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 6142fa3c..0a548a52 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -35,7 +35,7 @@ import reactor.kafka.sender.SenderResult
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
return sender.send(records)
.doOnNext(::logException)
@@ -43,14 +43,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
.map { it.correlationMetadata() }
}
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> {
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
return SenderRecord.create(
msg.topic,
msg.partition,
System.currentTimeMillis(),
msg.message.header,
msg.message,
- msg.message)
+ msg)
}
private fun logException(senderResult: SenderResult<out Any>) {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index c4e9874f..5099ae4c 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.Exceptions
@@ -40,7 +41,8 @@ internal class Sut {
val configurationProvider = FakeConfigurationProvider()
val sink = FakeSink()
val alloc = UnpooledByteBufAllocator.DEFAULT
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
+ val metrics = FakeMetrics()
+ private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
new file mode 100644
index 00000000..cfc44bcd
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class FakeMetrics: Metrics {
+ override fun notifyBytesReceived(size: Int) {
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ }
+
+} \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 5d592e42..b0dbd0f5 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -36,7 +36,7 @@ class FakeSink : Sink {
val sentMessages: List<RoutedMessage>
get() = sent.toList()
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
- return messages.doOnNext(sent::addLast).map { it.message }
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ return messages.doOnNext(sent::addLast)
}
}
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index a5a35ba3..dbec1def 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -91,6 +91,14 @@
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
@@ -109,6 +117,10 @@
<classifier>${os.detected.classifier}</classifier>
</dependency>
<dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-jmx</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
new file mode 100644
index 00000000..8a8b6d39
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Clock
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.jmx.JmxConfig
+import io.micrometer.jmx.JmxMeterRegistry
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class MicrometerMetrics(
+ private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)
+) : Metrics {
+
+ private val receivedBytes = registry.counter("data.received.bytes")
+ private val receivedMsgCount = registry.counter("messages.received.count")
+ private val receivedMsgBytes = registry.counter("messages.received.bytes")
+ private val sentCountTotal = registry.counter("messages.sent.count")
+
+ init {
+ registry.gauge("messages.processing.count", this) {
+ (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ }
+ }
+
+ private val sentCount = { topic: String ->
+ registry.counter("messages.sent.count", "topic", topic)
+ }.memoize<String, Counter>()
+
+
+ override fun notifyBytesReceived(size: Int) {
+ receivedBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ receivedMsgCount.increment()
+ receivedMsgBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ sentCountTotal.increment()
+ sentCount(topic).increment()
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index b7d97028..1f2686ba 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -39,7 +39,8 @@ fun main(args: Array<String>) {
val collectorProvider = CollectorFactory(
resolveConfigurationProvider(serverConfiguration),
- AdapterFactory.kafkaSink()
+ AdapterFactory.kafkaSink(),
+ MicrometerMetrics()
).createVesHvCollectorProvider()
ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
} catch (ex: WrongArgumentException) {
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
new file mode 100644
index 00000000..675647c4
--- /dev/null
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -0,0 +1,191 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Try
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.Gauge
+import io.micrometer.core.instrument.search.RequiredSearch
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.data.Percentage
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object MicrometerMetricsTest : Spek({
+ val doublePrecision = Percentage.withPercentage(0.5)
+ lateinit var registry: SimpleMeterRegistry
+ lateinit var cut: MicrometerMetrics
+
+ beforeEachTest {
+ registry = SimpleMeterRegistry()
+ cut = MicrometerMetrics(registry)
+ }
+
+ fun registrySearch() = RequiredSearch.`in`(registry)
+
+ fun <M, T> verifyMeter(search: RequiredSearch, map: (RequiredSearch) -> M, verifier: (M) -> T) =
+ Try {
+ map(search)
+ }.fold(
+ { ex -> assertThat(ex).doesNotThrowAnyException() },
+ verifier
+ )
+
+ fun <T> verifyGauge(name: String, verifier: (Gauge) -> T) =
+ verifyMeter(registrySearch().name(name), RequiredSearch::gauge, verifier)
+
+ fun <T> verifyCounter(search: RequiredSearch, verifier: (Counter) -> T) =
+ verifyMeter(search, RequiredSearch::counter, verifier)
+
+ fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
+ verifyCounter(registrySearch().name(name), verifier)
+
+ fun verifyAllCountersAreUnchangedBut(vararg changedCounters: String) {
+ registry.meters
+ .filter { it is Counter }
+ .filterNot { it.id.name in changedCounters }
+ .forEach { assertThat((it as Counter).count()).isCloseTo(0.0, doublePrecision) }
+ }
+
+ describe("notifyBytesReceived") {
+
+ on("data.received.bytes counter") {
+ val counterName = "data.received.bytes"
+
+ it("should increment counter") {
+ val bytes = 128
+ cut.notifyBytesReceived(bytes)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyBytesReceived(128)
+ verifyAllCountersAreUnchangedBut(counterName)
+ }
+ }
+ }
+
+ describe("notifyMessageReceived") {
+ on("messages.received.count counter") {
+ val counterName = "messages.received.count"
+
+ it("should increment counter") {
+ cut.notifyMessageReceived(777)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ on("messages.received.bytes counter") {
+ val counterName = "messages.received.bytes"
+
+ it("should increment counter") {
+ val bytes = 888
+ cut.notifyMessageReceived(bytes)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(bytes.toDouble(), doublePrecision)
+ }
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyMessageReceived(128)
+ verifyAllCountersAreUnchangedBut("messages.received.count", "messages.received.bytes")
+ }
+ }
+
+ describe("notifyMessageSent") {
+ val topicName = "dmaap_topic_name"
+ val counterName = "messages.sent.count"
+
+ on("$counterName counter") {
+
+ it("should increment counter") {
+ cut.notifyMessageSent(topicName)
+
+ verifyCounter(counterName) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ on("$counterName[topic=$topicName] counter") {
+
+ it("should increment counter") {
+ cut.notifyMessageSent(topicName)
+
+ verifyCounter(registrySearch().name(counterName).tag("topic", topicName)) { counter ->
+ assertThat(counter.count()).isCloseTo(1.0, doublePrecision)
+ }
+ }
+ }
+
+ it("should leave all other counters unchanged") {
+ cut.notifyMessageSent(topicName)
+ verifyAllCountersAreUnchangedBut(counterName)
+ }
+ }
+
+ describe("processing gauge") {
+ it("should show difference between sent and received messages") {
+
+ on("positive difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageReceived(256)
+ cut.notifyMessageReceived(256)
+ cut.notifyMessageSent("hvranmeas")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ on("zero difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageSent("hvranmeas")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ on("negative difference") {
+ cut.notifyMessageReceived(128)
+ cut.notifyMessageSent("calltrace")
+ cut.notifyMessageSent("hvranmeas")
+ verifyGauge("messages.processing.count") { gauge ->
+ assertThat(gauge.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
+
+})
diff --git a/pom.xml b/pom.xml
index 9e33ec56..f478df3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -529,6 +529,16 @@
<version>${kotlin.version}</version>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ <version>0.7.2</version>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.0-alpha4</version>
@@ -582,6 +592,11 @@
<artifactId>ratpack-core</artifactId>
<version>1.5.4</version>
</dependency>
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-jmx</artifactId>
+ <version>1.0.5</version>
+ </dependency>
<!-- Test dependencies -->