aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt6
1 files changed, 3 insertions, 3 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
index f7703b86..d53609ca 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
@@ -19,10 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import java.util.*
@@ -33,10 +33,10 @@ import java.util.*
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- fun start(): Mono<Consumer> = Mono.create { sink ->
+ fun start(): IO<Consumer> = IO {
val consumer = Consumer()
receiver.receive().subscribe(consumer::update)
- sink.success(consumer)
+ consumer
}
companion object {