diff options
Diffstat (limited to 'sources/hv-collector-kafka-consumer')
13 files changed, 732 insertions, 21 deletions
diff --git a/sources/hv-collector-kafka-consumer/pom.xml b/sources/hv-collector-kafka-consumer/pom.xml index 45a32729..7ffb5ebf 100644 --- a/sources/hv-collector-kafka-consumer/pom.xml +++ b/sources/hv-collector-kafka-consumer/pom.xml @@ -15,7 +15,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.3.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -65,6 +65,13 @@ <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-commandline</artifactId> <version>${project.parent.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-kafka</artifactId> + <version>${project.parent.version}</version> + <scope>compile</scope> </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> @@ -73,23 +80,35 @@ <scope>test</scope> </dependency> <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <optional>true</optional> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>runtime</scope> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-test</artifactId> + <scope>test</scope> </dependency> - </dependencies> </project> diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt new file mode 100644 index 00000000..be7b5cca --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfiguration.kt @@ -0,0 +1,58 @@ +/* +* ============LICENSE_START======================================================= +* dcaegen2-collectors-veshv +* ================================================================================ +* Copyright (C) 2019 NOKIA +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.dcae.collectors.veshv.kafkaconsumer.config + +import arrow.core.Option +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.* +import org.onap.dcae.collectors.veshv.commandline.hasOption +import org.onap.dcae.collectors.veshv.commandline.intValue +import org.onap.dcae.collectors.veshv.commandline.stringValue +import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding +import java.net.InetSocketAddress + +internal class ArgKafkaConsumerConfiguration : ArgBasedConfiguration<KafkaConsumerConfiguration>(DefaultParser()) { + override val cmdLineOptionsList: List<CommandLineOption> = listOf( + LISTEN_PORT, + KAFKA_TOPICS, + KAFKA_SERVERS, + DISABLE_PROCESSING + ) + + override fun getConfiguration(cmdLine: CommandLine): Option<KafkaConsumerConfiguration> = + binding { + val listenPort = cmdLine.intValue(LISTEN_PORT).bind() + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS) + .map { it.split(',').toSet() } + .bind() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS).bind() + val disableProcessing = cmdLine.hasOption(DISABLE_PROCESSING) + + KafkaConsumerConfiguration( + InetSocketAddress(listenPort), + kafkaTopics, + kafkaBootstrapServers, + disableProcessing + ) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt new file mode 100644 index 00000000..cdd4c30a --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/KafkaConsumerConfiguration.kt @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.config + +import java.net.InetSocketAddress + +internal data class KafkaConsumerConfiguration( + val apiAddress: InetSocketAddress, + val kafkaTopics: Set<String>, + val kafkaBootstrapServers: String, + val disableProcessing: Boolean +) diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt new file mode 100644 index 00000000..dd24345d --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, + private val topics: Set<String>, + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job = + GlobalScope.launch(dispatcher) { + kafkaConsumer.subscribe(topics) + val topicPartitions = kafkaConsumer.assignment() + while (isActive) { + kafkaConsumer.endOffsets(topicPartitions) + .forEach { (topicPartition, offset) -> + offsetConsumer.update(topicPartition, offset) + } + kafkaConsumer.commitSync() + delay(updateInterval) + } + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt index fa15587c..7e77bae9 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,4 +19,26 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer -fun main(args: Array<String>) = println("Guten tag") +import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer +import org.onap.dcae.collectors.veshv.utils.process.ExitCode +import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess + +private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.kafkaconsumer" +const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" + +fun main(args: Array<String>): Unit = + ArgKafkaConsumerConfiguration().parse(args) + .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp) + .let(ExitCode::doExit) + + +private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess { + PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE) + .start().block()!!.await().block() // TODO refactor netty server logic + + return ExitSuccess +} diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index b7ea126f..e576a88f 100644 --- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/SampleTest.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,14 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.kafkaconsumer +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import kotlin.test.assertTrue - -object SampleTest : Spek({ - describe("sample test") { - assertTrue(true) - } -}) +interface Metrics { + fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0) + fun notifyMessageTravelTime(messageSentTimeMicros: Long) +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt new file mode 100644 index 00000000..da1225e9 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.core.instrument.Timer +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.onap.dcae.collectors.veshv.utils.TimeUtils +import reactor.core.publisher.Mono +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicLong + +internal class MicrometerMetrics constructor( + private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) +) : Metrics { + + private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0)) + private val travelTime = Timer.builder(name("travel.time")) + .publishPercentileHistogram(true) + .register(registry) + + fun lastStatus(): Mono<String> = Mono.fromCallable { + registry.scrape() + } + + override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) { + // TODO use topic and partition + currentOffset.lazySet(offset) + } + + override fun notifyMessageTravelTime(messageSentTimeMicros: Long) { + travelTime.record(Duration.between(TimeUtils.epochMicroToInstant(messageSentTimeMicros), Instant.now())) + } + + companion object { + val INSTANCE by lazy { MicrometerMetrics() } + + private const val PREFIX = "hv-kafka-consumer" + private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt new file mode 100644 index 00000000..29a17fc1 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/http/PrometheusApiServer.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http + +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest +import reactor.netty.http.server.HttpServerResponse +import java.net.InetSocketAddress + +internal class PrometheusApiServer(private val listenAddress: InetSocketAddress, + private val metrics: MicrometerMetrics) { + + private val logger = Logger(PrometheusApiServer::class) + + fun start(): Mono<NettyServerHandle> = + HttpServer.create() + .tcpConfiguration { it.addressSupplier { listenAddress } } + .route { it.get("/monitoring/prometheus", ::metricsHandler) } + .bind() + .map { NettyServerHandle(it) } + .doOnSuccess(::logServerStarted) + + + private fun metricsHandler(_req: HttpServerRequest, resp: HttpServerResponse) = + resp.sendString(metrics.lastStatus()) + + + private fun logServerStarted(handle: ServerHandle) = + logger.info { + "Kafka Consumer API server is up and listening on ${handle.host}:${handle.port}" + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt new file mode 100644 index 00000000..1481a224 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + + +internal class OffsetConsumer(private val metrics: Metrics) { + + fun update(topicPartition: TopicPartition, offset: Long) { + logger.trace { + "Current consumer offset $offset for topic ${topicPartition.topic()} " + + "on partition ${topicPartition.partition()}" + } + metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition()) + } + + fun reset() = Unit + + companion object { + val logger = Logger(OffsetConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..da0f7f4b --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2019 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> +<configuration> + <property name="COMPONENT_NAME" + value="hv-ves-kafka-consumer-app"/> + <property name="COMPONENT_SHORT_NAME" + value="kafka-consumer-app"/> + + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt new file mode 100644 index 00000000..b0eb7a52 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt @@ -0,0 +1,154 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.impl + +import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.times +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.verifyZeroInteractions +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.test.TestCoroutineDispatcher +import kotlinx.coroutines.test.runBlockingTest +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer + +@ExperimentalCoroutinesApi +object KafkaSourceTest : Spek({ + given("KafkaSource") { + val testDispatcher = TestCoroutineDispatcher() + val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>() + afterEachTest { + testDispatcher.cleanupTestCoroutines() + } + given("single topicName and partition") { + val topics = setOf("topicName") + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + on("started KafkaSource") { + val topicPartition = createTopicPartition("topicName") + val topicPartitions = setOf(topicPartition) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset)) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should call update function on topicName") { + verify(mockedOffsetConsumer).update(topicPartition, newOffset) + } + } + } + + given("two topics with partition") { + val topics = setOf(topicName1, topicName2) + val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("started KafkaSource for two iteration of while loop") { + val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>() + val offsetArgumentCaptor = argumentCaptor<Long>() + val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>() + val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>() + val topicPartition1 = createTopicPartition(topicName1) + val topicPartition2 = createTopicPartition(topicName2) + val topicPartitions = setOf(topicPartition1, topicPartition2) + whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions) + val partitionToOffset1 = + mapOf(topicPartition1 to newOffset, + topicPartition2 to anotherNewOffset) + val partitionToOffset2 = + mapOf(topicPartition1 to anotherNewOffset, + topicPartition2 to newOffset) + whenever(mockedKafkaConsumer.endOffsets(topicPartitions)) + .thenReturn(partitionToOffset1, partitionToOffset2) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(), + offsetArgumentCaptor.capture()) + + testDispatcher.advanceTimeBy(updateIntervalInMs) + + verify(mockedOffsetConsumer, times(topicsAmountAfterInterval)) + .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture()) + + it("should calls update function with proper arguments - before interval") { + assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset) + assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset) + } + it("should calls update function with proper arguments - after interval") { + assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1) + assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset) + assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2) + assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset) + } + job.cancelAndJoin() + } + } + } + + given("empty topicName list") { + val emptyTopics = emptySet<String>() + val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher) + val mockedOffsetConsumer = mock<OffsetConsumer>() + + on("call of function start") { + val emptyTopicPartitions = setOf(null) + whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions) + whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions)) + .thenReturn(emptyMap()) + + runBlockingTest { + val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs) + job.cancelAndJoin() + } + + it("should not interact with OffsetConsumer") { + verifyZeroInteractions(mockedOffsetConsumer) + } + } + } + + } +}) + +private const val updateIntervalInMs = 10L +private const val partitionNumber = 0 +private const val newOffset = 2L +private const val anotherNewOffset = 10L +private const val topicName1 = "topicName1" +private const val topicName2 = "topicName2" +private const val topicsAmount = 2 +private const val topicsAmountAfterInterval = 4 +fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt new file mode 100644 index 00000000..96ba588f --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics + +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.data.Percentage +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge +import org.onap.dcae.collectors.veshv.tests.utils.verifyTimer +import java.time.Instant +import java.util.concurrent.TimeUnit + +object MicrometerMetricsTest : Spek({ + val PREFIX = "hv-kafka-consumer" + val doublePrecision = Percentage.withPercentage(0.5) + lateinit var registry: PrometheusMeterRegistry + lateinit var cut: MicrometerMetrics + + beforeEachTest { + registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) + cut = MicrometerMetrics(registry) + } + + describe("Timers") { + val arbitraryMessageTravelTime = 100L + val messageSentTimeMicros = Instant.now().minusMillis(arbitraryMessageTravelTime).toEpochMilli() * 1000 + val timerName = "$PREFIX.travel.time" + + on("notifyMessageTravelTime") { + it("should update timer $timerName") { + + val timeBeforeNotifyMicros = Instant.now().toEpochMilli() * 1000 + cut.notifyMessageTravelTime(messageSentTimeMicros) + val timeAfterNotifyMicros = Instant.now().toEpochMilli() * 1000 + + registry.verifyTimer(timerName) { timer -> + val travelTimeBeforeNotify = (timeBeforeNotifyMicros - messageSentTimeMicros).toDouble() + val travelTimeAfterNotify = (timeAfterNotifyMicros - messageSentTimeMicros).toDouble() + assertThat(timer.totalTime(TimeUnit.MICROSECONDS)) + .isLessThanOrEqualTo(travelTimeAfterNotify) + .isGreaterThanOrEqualTo(travelTimeBeforeNotify) + + } + } + } + } + + describe("Gauges") { + val gaugeName = "$PREFIX.consumer.offset" + + on("notifyOffsetChanged") { + val offset = 966L + + it("should update $gaugeName") { + cut.notifyOffsetChanged(offset, "sample_topic", 1) + + registry.verifyGauge(gaugeName) { + assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision) + } + } + } + } +}) diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt new file mode 100644 index 00000000..242f27be --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import org.apache.kafka.common.TopicPartition +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +object OffsetConsumerTest : Spek({ + describe("OffsetConsumer with metrics") { + val mockedMetrics = mock<Metrics>() + val offsetConsumer = OffsetConsumer(mockedMetrics) + given("topicName with partition"){ + val topicPartition = TopicPartition(topicName, partitionNumber) + + on("new update method call") { + offsetConsumer.update(topicPartition, newOffset) + + it("should notify message newOffset metric") { + verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber) + } + } + } + } +}) + +private const val partitionNumber = 1 +private const val newOffset: Long = 99 +private const val topicName = "sample-topicName" |