diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt')
-rw-r--r-- | hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt index f7703b86..d53609ca 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -19,10 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions import java.util.* @@ -33,10 +33,10 @@ import java.util.* */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): Mono<Consumer> = Mono.create { sink -> + fun start(): IO<Consumer> = IO { val consumer = Consumer() receiver.receive().subscribe(consumer::update) - sink.success(consumer) + consumer } companion object { |