diff options
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt')
-rw-r--r-- | sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt | 11 |
1 files changed, 4 insertions, 7 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index 10dedbdf..7bab9676 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,11 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters -import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer -import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions @@ -34,11 +32,10 @@ import reactor.kafka.receiver.ReceiverOptions */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): IO<Consumer> = IO { - val consumer = Consumer() - receiver.receive().map(consumer::update).evaluateIo().subscribe() - consumer - } + fun start() = Consumer() + .also { consumer -> + receiver.receive().map(consumer::update) + } companion object { private val logger = Logger(KafkaSource::class) |