diff options
author | kjaniak <kornel.janiak@nokia.com> | 2019-06-19 08:39:33 +0200 |
---|---|---|
committer | kjaniak <kornel.janiak@nokia.com> | 2019-06-25 15:52:13 +0200 |
commit | 7808010c1a18531ee9b618f934d31816193cac38 (patch) | |
tree | 3680678be6f909054ff3a022aeda35b1ee5e47ac /sources/hv-collector-kafka-consumer/src | |
parent | e52b0ef6c1c3b502f840f08a17b6594e17957db1 (diff) |
Implement message counting in consumer
Issue-ID: DCAEGEN2-1635
Change-Id: I2666de7bad27052d9cefa0f687ad0772d4c9a95d
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
Diffstat (limited to 'sources/hv-collector-kafka-consumer/src')
5 files changed, 189 insertions, 1 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt index cbdb45dc..64a7fb3e 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt @@ -19,4 +19,6 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics -internal interface Metrics
\ No newline at end of file +internal interface Metrics { + fun notifyOffsetChanged(size: Long) +} diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt index adb1ff1f..f137d074 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt @@ -26,6 +26,9 @@ import reactor.core.publisher.Mono internal class MicrometerMetrics constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { + override fun notifyOffsetChanged(size: Long) { + // TODO implementation here + } fun lastStatus(): Mono<String> = Mono.fromCallable { registry.scrape() diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt new file mode 100644 index 00000000..2c6707f9 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics +import org.onap.dcae.collectors.veshv.utils.logging.Logger + + +internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer { + + override fun update(record: ConsumerRecord<ByteArray, ByteArray>) { + val offset = record.offset() + logger.trace { "Current consumer offset $offset" } + metrics.notifyOffsetChanged(offset) + } + + override fun reset() = Unit + + companion object { + private val logger = Logger(OffsetConsumer::class) + } +} diff --git a/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml new file mode 100644 index 00000000..da0f7f4b --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/resources/logback.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2019 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= +--> +<configuration> + <property name="COMPONENT_NAME" + value="hv-ves-kafka-consumer-app"/> + <property name="COMPONENT_SHORT_NAME" + value="kafka-consumer-app"/> + + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + + <property name="p_tim" value="%date{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC}"/> + <property name="p_thr" value="%thread"/> + <property name="p_lvl" value="%highlight(%-5level)"/> + <property name="p_log" value="%50.50logger"/> + <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/> + <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/> + <property name="SIMPLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| %rootException%n"/> + <property name="READABLE_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_log}\t +| ${p_lvl}\t +| %msg\t +| ${p_mak}\t +| %rootException\t +| ${p_mdc}\t +| ${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" value=" +%nopexception +| ${p_tim}\t +| ${p_thr}\t +| ${p_lvl}\t +| ${p_log}\t +| ${p_mdc}\t +| ${p_msg}\t +| ${p_exc}\t +| ${p_mak}%n"/> + <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${LOG_PATTERN_IN_USE}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt new file mode 100644 index 00000000..6fb42d81 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.state + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics + +object OffsetConsumerTest : Spek({ + describe("OffsetConsumer with metrics") { + val mockedMetrics = mock<Metrics>() + val offsetConsumer = OffsetConsumer(mockedMetrics) + + on("new update method call") { + val consumerRecord = mock<ConsumerRecord<ByteArray, ByteArray>>() + whenever(consumerRecord.offset()).thenReturn(1) + + offsetConsumer.update(consumerRecord) + + it("should notify message offset metric") { + verify(mockedMetrics).notifyOffsetChanged(1) + } + } + } +}) |