summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-kafka-consumer
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-06-14 15:48:32 +0200
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-06-19 15:36:50 +0200
commit3b5bfc1ab4d2c1e41a6d9a418240d411a71b4ad2 (patch)
tree72b649521adc90da8fc3f6f3f299666692ab0c9e /sources/hv-collector-kafka-consumer
parentbdc928c898af450b1a39a42b465d2ca7fb6dc138 (diff)
Expose Prometheus metrics in Kafka Consumer
Change-Id: I9d0568a5fc296604d7ac7a45b8bbd0289b845938 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1626
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
-rw-r--r--sources/hv-collector-kafka-consumer/pom.xml9
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt40
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt24
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt26
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt22
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt37
-rw-r--r--sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt54
7 files changed, 209 insertions, 3 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml
index 45a32729..1e20d5b1 100644
--- a/sources/hv-collector-kafka-consumer/pom.xml
+++ b/sources/hv-collector-kafka-consumer/pom.xml
@@ -90,6 +90,13 @@
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
-
+ <dependency>
+ <groupId>io.micrometer</groupId>
+ <artifactId>micrometer-registry-prometheus</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..4d65e916
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt
@@ -0,0 +1,40 @@
+/*
+* ============LICENSE_START=======================================================
+* dcaegen2-collectors-veshv
+* ================================================================================
+* Copyright (C) 2019 NOKIA
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import arrow.core.Option
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.commandline.intValue
+import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
+import java.net.InetSocketAddress
+
+internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList: List<CommandLineOption> = listOf(LISTEN_PORT)
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> =
+ binding {
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ KafkaConsumerConfiguration(InetSocketAddress(listenPort))
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
new file mode 100644
index 00000000..ef06a0d1
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt
@@ -0,0 +1,24 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.config
+
+import java.net.InetSocketAddress
+
+internal data class KafkaConsumerConfiguration(val apiAddress: InetSocketAddress)
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index fa15587c..7e77bae9 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,4 +19,26 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
-fun main(args: Array<String>) = println("Guten tag")
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
+
+private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.kafkaconsumer"
+const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
+
+fun main(args: Array<String>): Unit =
+ ArgKafkaConsumerConfiguration().parse(args)
+ .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp)
+ .let(ExitCode::doExit)
+
+
+private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
+ .start().block()!!.await().block() // TODO refactor netty server logic
+
+ return ExitSuccess
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
new file mode 100644
index 00000000..cbdb45dc
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+internal interface Metrics \ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
new file mode 100644
index 00000000..adb1ff1f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
+
+import io.micrometer.prometheus.PrometheusConfig
+import io.micrometer.prometheus.PrometheusMeterRegistry
+import reactor.core.publisher.Mono
+
+internal class MicrometerMetrics constructor(
+ private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
+) : Metrics {
+
+ fun lastStatus(): Mono<String> = Mono.fromCallable {
+ registry.scrape()
+ }
+
+ companion object {
+ val INSTANCE by lazy { MicrometerMetrics() }
+ }
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
new file mode 100644
index 00000000..29a17fc1
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http
+
+import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import java.net.InetSocketAddress
+
+internal class PrometheusApiServer(private val listenAddress: InetSocketAddress,
+ private val metrics: MicrometerMetrics) {
+
+ private val logger = Logger(PrometheusApiServer::class)
+
+ fun start(): Mono<NettyServerHandle> =
+ HttpServer.create()
+ .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .route { it.get("/monitoring/prometheus", ::metricsHandler) }
+ .bind()
+ .map { NettyServerHandle(it) }
+ .doOnSuccess(::logServerStarted)
+
+
+ private fun metricsHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.sendString(metrics.lastStatus())
+
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info {
+ "Kafka Consumer API server is up and listening on ${handle.host}:${handle.port}"
+ }
+}