diff options
author | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-07-10 09:29:51 +0200 |
---|---|---|
committer | Izabela Zawadzka <izabela.zawadzka@nokia.com> | 2019-07-12 09:59:06 +0200 |
commit | a313a7fb4ba81e960397ec432ce02c28804a44b4 (patch) | |
tree | 4178df42f3ee571b19872e851e5aaf5d17e270cb | |
parent | dbf9712c95867185931b6f15d12ded7b45437a24 (diff) |
Use consumers in main
It includes --disable-processing flag.
Also fixed some issues with script for local performance test.
Also added KafkaConsumer::poll in OffsetKafka Consumer - without it KafkaConsumer::assignment returns empty set
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1657
Change-Id: I95fadb45f321398346094dfa0c4a6e9da954c186
6 files changed, 70 insertions, 13 deletions
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt new file mode 100644 index 00000000..6dddd0f8 --- /dev/null +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.kafkaconsumer.api + +import kotlinx.coroutines.Job +import java.time.Duration + +interface MetricsKafkaConsumer { + suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job + + companion object{ + private const val defaultUpdateInterval = 500L + private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L) + } +}
\ No newline at end of file diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt index d105c4a7..18de6fcc 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt @@ -28,19 +28,23 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition +import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.time.Duration internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, private val topics: Set<String>, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO) { + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher) { - kafkaConsumer.subscribe(topics) - val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.assign(topics.map { TopicPartition(it, 0) }) while (isActive) { + val topicPartitions = kafkaConsumer.assignment() + kafkaConsumer.endOffsets(topicPartitions) .forEach { (topicPartition, offset) -> update(topicPartition, offset) @@ -58,6 +62,6 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte } companion object { - val logger = Logger(OffsetKafkaConsumer::class) + private val logger = Logger(OffsetKafkaConsumer::class) } } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt index f47a66d0..7574d61d 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt @@ -29,21 +29,22 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventOuterClass import java.time.Duration internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>, private val topics: Set<String>, private val metrics: Metrics, - private val dispatcher: CoroutineDispatcher = Dispatchers.IO){ + private val dispatcher: CoroutineDispatcher = Dispatchers.IO) + : MetricsKafkaConsumer{ - suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job = + override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job = GlobalScope.launch(dispatcher){ kafkaConsumer.subscribe(topics) while (isActive){ - kafkaConsumer.poll(timeout).forEach(::update) + kafkaConsumer.poll(pollTimeout).forEach(::update) kafkaConsumer.commitSync() delay(updateInterval) } diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt index 7e77bae9..9bf4310b 100644 --- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt +++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt @@ -19,9 +19,14 @@ */ package org.onap.dcae.collectors.veshv.kafkaconsumer +import kotlinx.coroutines.runBlocking +import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer +import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer import org.onap.dcae.collectors.veshv.utils.process.ExitCode @@ -37,6 +42,20 @@ fun main(args: Array<String>): Unit = private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess { + val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create( + config.kafkaBootstrapServers) + ) + + runBlocking { + if (config.disableProcessing) { + OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } else { + ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE) + .start() + } + } + PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE) .start().block()!!.await().block() // TODO refactor netty server logic diff --git a/tools/performance/local/docker-compose.yml b/tools/performance/local/docker-compose.yml index c0dfc478..73d02fd1 100644 --- a/tools/performance/local/docker-compose.yml +++ b/tools/performance/local/docker-compose.yml @@ -104,7 +104,7 @@ services: image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer ports: - "6064:6064/tcp" - command: ["--listen-port", "6062"] + command: ["--listen-port", "6064", "--kafka-topics", "HV_VES_PERF3GPP", "--kafka-bootstrap-servers", "message-router-kafka-0:9093"] depends_on: - message-router-kafka-0 diff --git a/tools/performance/local/local-performance-test.sh b/tools/performance/local/local-performance-test.sh index 3c885a64..6d08b8e7 100755 --- a/tools/performance/local/local-performance-test.sh +++ b/tools/performance/local/local-performance-test.sh @@ -1,10 +1,10 @@ #!/usr/bin/env bash -SCRIPT_DIRECTORY="$(dirname "$0")" +SCRIPT_DIRECTORY="$(pwd "$0")" CERT_FILE=${CERT_FILE:-/ssl/client.p12} CERT_PASS_FILE=${CERT_PASS_FILE:-/ssl/client.pass} -HV_VES_NETWORK=${HV_VES_NETWORK:-performance_default} -VOLUME_MAPPING=${VOLUME_MAPPING:-$PWD/../ssl/:/ssl} +HV_VES_NETWORK=${HV_VES_NETWORK:-local_default} +VOLUME_MAPPING=${VOLUME_MAPPING:-$SCRIPT_DIRECTORY/../../ssl/:/ssl} PRODUCER_IMAGE_NAME=${PRODUCER_IMAGE_NAME:-the-a-team-registry-local.esisoj70.emea.nsn-net.net/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-rust-client:latest} PRODUCER_APP_NAME=hv-ves-producer @@ -115,6 +115,7 @@ function clean(){ echo "Clearing generated certs" cd ../../ssl ./gen-certs.sh clean + cd "$SCRIPT_DIRECTORY" echo "Removing HV-VES components" |