diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-02-07 11:18:11 +0000 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-02-21 09:28:57 +0000 |
commit | 955431838bef5bc74cb62cb0c550613b8a6f13b6 (patch) | |
tree | 5fb1588e3b1a4cad4435c22df38752fe20f48830 /sources/hv-collector-core/src/main/kotlin | |
parent | fc79bd62956e79fe08f7a5eb36f75dd826a4fc86 (diff) |
[DCAE-HV-VES] Add jaas config for kafka connect
Adding SCRAM config for kafka producer
Modified versioning process in the poms
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Ia6bc442fa7cbf5b76cbdcc160b76485091f50942
Issue-ID: DCAEGEN2-3038
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt | 25 |
1 files changed, 18 insertions, 7 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt index 1c4acf64..2fcc99e5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSenderOptionsFactory.kt @@ -3,6 +3,7 @@ * dcaegen2-collectors-veshv * ================================================================================ * Copyright (C) 2019 NOKIA + * Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,13 +25,17 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.plain.internals.PlainSaslServer -import org.jetbrains.annotations.Nullable +import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.applyIf import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.SenderOptions +import java.lang.Boolean.parseBoolean + + + internal object KafkaSenderOptionsFactory { @@ -40,6 +45,7 @@ internal object KafkaSenderOptionsFactory { private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name + private val USE_SCRAM = parseBoolean(System.getenv().getOrDefault("USE_SCRAM", "false")) fun createSenderOptions(kafkaSink: KafkaSink): SenderOptions<CommonEventHeader, VesMessage> = SenderOptions.create<CommonEventHeader, VesMessage>() @@ -52,14 +58,19 @@ internal object KafkaSenderOptionsFactory { .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) .producerProperty(ProducerConfig.ACKS_CONFIG, "1") .stopOnError(false) - .applyIf(kafkaSink.aafCredentials() != null) { - producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) - .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) - .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!)) - } + .applyIf(kafkaSink.aafCredentials() != null) { + producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) + .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig(kafkaSink.aafCredentials()!!)) + } + .applyIf(USE_SCRAM) { + producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + .producerProperty(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName()) + .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG")) + } private fun jaasConfig(aafCredentials: AafCredentials) = - """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";""" + """$LOGIN_MODULE_CLASS required username="${aafCredentials.username().jaasEscape()}" password="${aafCredentials.password().jaasEscape()}";""" private fun String?.jaasEscape() = this?.replace("\"", "\\\"") |