From 4988554ea65db50dbbb50c8c80171f7910548571 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 18 Apr 2019 14:27:05 +0200 Subject: Use SASL auth in kafka connections Change-Id: I55a9289901a6a44f3d07a3cf4e5a028399a5d0dc Issue-ID: DCAEGEN2-1448 Signed-off-by: Filip Krzywka --- .../simulators/dcaeapp/impl/adapters/KafkaSource.kt | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'sources/hv-collector-dcae-app-simulator') diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index b91e7a1c..b5b692d8 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,7 +19,11 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux @@ -40,10 +44,17 @@ internal class KafkaSource(private val receiver: KafkaReceiver).name + fun create(bootstrapServers: String, topics: Set): KafkaSource { return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) } + fun createReceiverOptions(bootstrapServers: String, topics: Set): ReceiverOptions? { val props = mapOf( @@ -52,7 +63,11 @@ internal class KafkaSource(private val receiver: KafkaReceiver(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } -- cgit 1.2.3-korg