aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-main/src/main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-13 15:45:00 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 09:11:16 +0200
commit0ba97c7eac5a821c813bfa8ac31b1063956d3824 (patch)
tree9b937c8ec7b3533f8fd63531170da3ac2fda7fbb /hv-collector-main/src/main
parent85a59b8d29c6f81720fe3d2e59926740173fcae9 (diff)
Add monitoring support by means of micrometer.io
Closes ONAP-345 Change-Id: I58c145b1d37a6b32fbe5b157723c152eb571a2dd Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-main/src/main')
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt67
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt3
2 files changed, 69 insertions, 1 deletions
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
new file mode 100644
index 00000000..8a8b6d39
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetrics.kt
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Clock
+import io.micrometer.core.instrument.Counter
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.jmx.JmxConfig
+import io.micrometer.jmx.JmxMeterRegistry
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class MicrometerMetrics(
+ private val registry: MeterRegistry = JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)
+) : Metrics {
+
+ private val receivedBytes = registry.counter("data.received.bytes")
+ private val receivedMsgCount = registry.counter("messages.received.count")
+ private val receivedMsgBytes = registry.counter("messages.received.bytes")
+ private val sentCountTotal = registry.counter("messages.sent.count")
+
+ init {
+ registry.gauge("messages.processing.count", this) {
+ (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ }
+ }
+
+ private val sentCount = { topic: String ->
+ registry.counter("messages.sent.count", "topic", topic)
+ }.memoize<String, Counter>()
+
+
+ override fun notifyBytesReceived(size: Int) {
+ receivedBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageReceived(size: Int) {
+ receivedMsgCount.increment()
+ receivedMsgBytes.increment(size.toDouble())
+ }
+
+ override fun notifyMessageSent(topic: String) {
+ sentCountTotal.increment()
+ sentCount(topic).increment()
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index b7d97028..1f2686ba 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -39,7 +39,8 @@ fun main(args: Array<String>) {
val collectorProvider = CollectorFactory(
resolveConfigurationProvider(serverConfiguration),
- AdapterFactory.kafkaSink()
+ AdapterFactory.kafkaSink(),
+ MicrometerMetrics()
).createVesHvCollectorProvider()
ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
} catch (ex: WrongArgumentException) {