aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt54
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt26
2 files changed, 72 insertions, 8 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
new file mode 100644
index 00000000..1b664edc
--- /dev/null
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018,2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.kafka.receiver.KafkaReceiver
+import reactor.kafka.receiver.ReceiverOptions
+import reactor.kafka.receiver.ReceiverRecord
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
+ fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
+ receiver.receive()
+ .doOnNext { it.receiverOffset().acknowledge() }
+ .also { logger.info { "Started Kafka source" } }
+
+ companion object {
+ private val logger = Logger(KafkaSource::class)
+
+ fun create(bootstrapServers: String, topics: Set<String>) =
+ KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
+
+ fun createReceiverOptions(bootstrapServers: String,
+ topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
+ val props = KafkaPropertiesFactory.create(bootstrapServers)
+ return ReceiverOptions.create<ByteArray, ByteArray>(props)
+ .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+ .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
+ .subscription(topics)
+ }
+ }
+}
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 8a7aafbe..6ee640a4 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -20,8 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.ConcurrentLinkedQueue
@@ -41,9 +40,10 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr
internal interface ConsumerStateProvider {
fun currentState(): ConsumerState
+ fun reset()
}
-internal class Consumer : KafkaConsumer, ConsumerStateProvider {
+internal class Consumer : ConsumerStateProvider {
private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
@@ -51,7 +51,7 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
override fun reset() = consumedMessages.clear()
- override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
+ fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
@@ -63,9 +63,19 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
- private val consumerProvider = { Consumer() }
-
fun createConsumersFor(topics: Set<String>) =
- ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
- .mapValues { it.value as Consumer }
+ KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource ->
+ val topicToConsumer = topics.associateWith { Consumer() }
+ kafkaSource.start()
+ .map {
+ val topic = it.topic()
+ topicToConsumer.get(topic)?.update(it)
+ ?: logger.warn { "No consumer configured for topic $topic" }
+ }.subscribe()
+ topicToConsumer
+ }
+
+ companion object {
+ private val logger = Logger(DcaeAppConsumerFactory::class)
+ }
}