aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-04-18 14:27:05 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-04-24 08:37:10 +0200
commit4988554ea65db50dbbb50c8c80171f7910548571 (patch)
tree02ae8976848d87a947c834d4b1fe0838d0a00034 /sources/hv-collector-core/src
parent482ff719edbb728827976622cef63c876cb6676e (diff)
Use SASL auth in kafka connections
Change-Id: I55a9289901a6a44f3d07a3cf4e5a028399a5d0dc Issue-ID: DCAEGEN2-1448 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt13
1 files changed, 13 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
index 40de8c51..b16ad109 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -19,7 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
@@ -34,6 +38,12 @@ private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
private const val BUFFER_MEMORY_MULTIPLIER = 32
private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
+private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+private const val USERNAME = "admin"
+private const val PASSWORD = "admin_secret"
+private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
internal fun createKafkaSender(sinkStream: SinkStream) =
(sinkStream as KafkaSink).let { kafkaSink ->
KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
@@ -45,6 +55,9 @@ internal fun createKafkaSender(sinkStream: SinkStream) =
.producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
.producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
.producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+ .producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+ .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG)
.stopOnError(false)
)
}