diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-18 14:27:05 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-24 08:37:10 +0200 |
commit | 4988554ea65db50dbbb50c8c80171f7910548571 (patch) | |
tree | 02ae8976848d87a947c834d4b1fe0838d0a00034 /sources | |
parent | 482ff719edbb728827976622cef63c876cb6676e (diff) |
Use SASL auth in kafka connections
Change-Id: I55a9289901a6a44f3d07a3cf4e5a028399a5d0dc
Issue-ID: DCAEGEN2-1448
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
2 files changed, 29 insertions, 1 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt index 40de8c51..b16ad109 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -19,7 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer @@ -34,6 +38,12 @@ private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 +private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" +private const val USERNAME = "admin" +private const val PASSWORD = "admin_secret" +private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" +private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name + internal fun createKafkaSender(sinkStream: SinkStream) = (sinkStream as KafkaSink).let { kafkaSink -> KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>() @@ -45,6 +55,9 @@ internal fun createKafkaSender(sinkStream: SinkStream) = .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) + .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG) .stopOnError(false) ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index b91e7a1c..b5b692d8 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,7 +19,11 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux @@ -40,10 +44,17 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr companion object { private val logger = Logger(KafkaSource::class) + private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" + private const val USERNAME = "admin" + private const val PASSWORD = "admin_secret" + private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" + private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name + fun create(bootstrapServers: String, topics: Set<String>): KafkaSource { return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) } + fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? { val props = mapOf<String, Any>( @@ -52,7 +63,11 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT, + SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM, + SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG ) return ReceiverOptions.create<ByteArray, ByteArray>(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } |