aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt')
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt35
1 files changed, 16 insertions, 19 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 869c5ab6..08bb149f 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -19,17 +19,24 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
-import arrow.core.Option
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
+import java.util.concurrent.ConcurrentLinkedQueue
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+ val messagesCount: Int by lazy {
+ messages.size
+ }
-class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
+ val consumedMessages: List<ByteArray> by lazy {
+ messages.toList()
+ }
+}
interface ConsumerStateProvider {
fun currentState(): ConsumerState
@@ -37,31 +44,21 @@ interface ConsumerStateProvider {
}
class Consumer : ConsumerStateProvider {
- private var msgCount = 0L
- private var lastKey: ByteArray? = null
- private var lastValue: ByteArray? = null
- override fun currentState() =
- ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+ private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
- override fun reset() = IO {
- synchronized(this) {
- msgCount = 0
- lastKey = null
- lastValue = null
- }
+ override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
+
+ override fun reset(): IO<Unit> = IO {
+ consumedMessages.clear()
}
fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
-
- synchronized(this) {
- msgCount++
- lastKey = record.key()
- lastValue = record.value()
- }
+ consumedMessages.add(record.value())
}
+
companion object {
private val logger = Logger(Consumer::class)
}