diff options
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt')
-rw-r--r-- | sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt | 17 |
1 files changed, 15 insertions, 2 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt index 725248ce..a6d1eddb 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt @@ -62,6 +62,19 @@ class Consumer : ConsumerStateProvider { } class ConsumerFactory(private val kafkaBootstrapServers: String) { - fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer = - KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start() + fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> = + KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource -> + val topicToConsumer = kafkaTopics.associate { it to Consumer() } + kafkaSource.start() + .map { + val topic = it.topic() + topicToConsumer.get(topic)?.update(it) + ?: logger.warn { "No consumer configured for topic $topic" } + }.subscribe() + topicToConsumer + } + + companion object { + private val logger = Logger(ConsumerFactory::class) + } } |