diff options
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt')
-rw-r--r-- | hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 869c5ab6..08bb149f 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -19,17 +19,24 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka -import arrow.core.Option import arrow.effects.IO import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord +import java.util.concurrent.ConcurrentLinkedQueue /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ +class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){ + val messagesCount: Int by lazy { + messages.size + } -class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>) + val consumedMessages: List<ByteArray> by lazy { + messages.toList() + } +} interface ConsumerStateProvider { fun currentState(): ConsumerState @@ -37,31 +44,21 @@ interface ConsumerStateProvider { } class Consumer : ConsumerStateProvider { - private var msgCount = 0L - private var lastKey: ByteArray? = null - private var lastValue: ByteArray? = null - override fun currentState() = - ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue)) + private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue() - override fun reset() = IO { - synchronized(this) { - msgCount = 0 - lastKey = null - lastValue = null - } + override fun currentState(): ConsumerState = ConsumerState(consumedMessages) + + override fun reset(): IO<Unit> = IO { + consumedMessages.clear() } fun update(record: ReceiverRecord<ByteArray, ByteArray>) { logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } - - synchronized(this) { - msgCount++ - lastKey = record.key() - lastValue = record.value() - } + consumedMessages.add(record.value()) } + companion object { private val logger = Logger(Consumer::class) } |