aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt25
1 files changed, 18 insertions, 7 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt
index 1c4acf64..2fcc99e5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt
@@ -3,6 +3,7 @@
* dcaegen2-collectors-veshv
* ================================================================================
* Copyright (C) 2019 NOKIA
+ * Copyright (C) 2022 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,13 +25,17 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.plain.internals.PlainSaslServer
-import org.jetbrains.annotations.Nullable
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.applyIf
import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.SenderOptions
+import java.lang.Boolean.parseBoolean
+
+
+
internal object KafkaSenderOptionsFactory {
@@ -40,6 +45,7 @@ internal object KafkaSenderOptionsFactory {
private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+ private val USE_SCRAM = parseBoolean(System.getenv().getOrDefault("USE_SCRAM", "false"))
fun createSenderOptions(kafkaSink: KafkaSink): SenderOptions<CommonEventHeader, VesMessage> =
SenderOptions.create<CommonEventHeader, VesMessage>()
@@ -52,14 +58,19 @@ internal object KafkaSenderOptionsFactory {
.producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
.producerProperty(ProducerConfig.ACKS_CONFIG, "1")
.stopOnError(false)
- .applyIf(kafkaSink.aafCredentials() != null) {
- producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
- .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
- .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
- }
+ .applyIf(kafkaSink.aafCredentials() != null) {
+ producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+ .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!))
+ }
+ .applyIf(USE_SCRAM) {
+ producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+ .producerProperty(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName())
+ .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"))
+ }
private fun jaasConfig(aafCredentials: AafCredentials) =
- """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
+ """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";"""
private fun String?.jaasEscape() = this?.replace("\"", "\\\"")