aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt17
1 files changed, 16 insertions, 1 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index b91e7a1c..b5b692d8 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -19,7 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
@@ -40,10 +44,17 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
companion object {
private val logger = Logger(KafkaSource::class)
+ private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+ private const val USERNAME = "admin"
+ private const val PASSWORD = "admin_secret"
+ private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+ private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
}
+
fun createReceiverOptions(bootstrapServers: String,
topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
val props = mapOf<String, Any>(
@@ -52,7 +63,11 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
+ SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
+ SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
)
return ReceiverOptions.create<ByteArray, ByteArray>(props)
.addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }