From 4988554ea65db50dbbb50c8c80171f7910548571 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 18 Apr 2019 14:27:05 +0200 Subject: Use SASL auth in kafka connections Change-Id: I55a9289901a6a44f3d07a3cf4e5a028399a5d0dc Issue-ID: DCAEGEN2-1448 Signed-off-by: Filip Krzywka --- development/docker-compose.yml | 43 +++++++++++++++------- .../org/onap/dcae/collectors/veshv/impl/kafka.kt | 13 +++++++ .../dcaeapp/impl/adapters/KafkaSource.kt | 17 ++++++++- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/development/docker-compose.yml b/development/docker-compose.yml index 9272c618..1319e394 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -6,22 +6,37 @@ services: # message-router-zookeeper: - image: wurstmeister/zookeeper + image: nexus3.onap.org:10001/onap/dmaap/zookeeper:4.0.0 ports: - "2181:2181" - message-router-kafka: - # image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1 - image: wurstmeister/kafka + message-router-kafka-0: + image: nexus3.onap.org:10001/onap/dmaap/kafka111:0.0.6 ports: - "9092:9092" + - "9093:9093" +# command: "start-kafka.sh" environment: - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT" - KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092" - KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092" - KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT" + HOST_IP: 127.0.0.1 + KAFKA_BROKER_ID: 0 + ENDPOINT_PORT: 30490 + KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_DELETE_TOPIC_ENABLE: "true" + + KAFKA_LISTENERS: "INTERNAL_SASL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_SASL_PLAINTEXT://0.0.0.0:9093" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL_SASL_PLAINTEXT://:9092,EXTERNAL_SASL_PLAINTEXT://:9093" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_SASL_PLAINTEXT" + KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN" + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" + KAFKA_AUTHORIZER_CLASS_NAME: "org.onap.dmaap.kafkaAuthorize.KafkaCustomAuthorizer" + + aaf_locate_url: https://aaf-locate:8095 + KAFKA_LOG_DIRS: /opt/kafka/data + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_NUM_PARTITIONS: 1 volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: @@ -47,7 +62,7 @@ services: "perf3gpp": { "type": "kafka", "kafka_info": { - "bootstrap_servers": "message-router-kafka:9092", + "bootstrap_servers": "message-router-kafka-0:9093", "topic_name": "HV_VES_PERF3GPP" } } @@ -88,7 +103,7 @@ services: retries: 3 start_period: 15s depends_on: - - message-router-kafka + - message-router-kafka-0 - config-binding-service volumes: - ./configuration/:/etc/ves-hv/configuration/ @@ -129,10 +144,10 @@ services: ports: - "6064:6064/tcp" command: ["--listen-port", "6064", - "--kafka-bootstrap-servers", "message-router-kafka:9092", + "--kafka-bootstrap-servers", "message-router-kafka-0:9092", "--kafka-topics", "HV_VES_PERF3GPP"] depends_on: - - message-router-kafka + - message-router-kafka-0 # # Monitoring diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt index 40de8c51..b16ad109 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt @@ -19,7 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer @@ -34,6 +38,12 @@ private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f private const val BUFFER_MEMORY_MULTIPLIER = 32 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024 +private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule" +private const val USERNAME = "admin" +private const val PASSWORD = "admin_secret" +private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;" +private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum).name + internal fun createKafkaSender(sinkStream: SinkStream) = (sinkStream as KafkaSink).let { kafkaSink -> KafkaSender.create(SenderOptions.create() @@ -45,6 +55,9 @@ internal fun createKafkaSender(sinkStream: SinkStream) = .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1) .producerProperty(ProducerConfig.RETRIES_CONFIG, 1) .producerProperty(ProducerConfig.ACKS_CONFIG, "1") + .producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT) + .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM) + .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG) .stopOnError(false) ) } diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt index b91e7a1c..b5b692d8 100644 --- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt +++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt @@ -19,7 +19,11 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux @@ -40,10 +44,17 @@ internal class KafkaSource(private val receiver: KafkaReceiver).name + fun create(bootstrapServers: String, topics: Set): KafkaSource { return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics))) } + fun createReceiverOptions(bootstrapServers: String, topics: Set): ReceiverOptions? { val props = mapOf( @@ -52,7 +63,11 @@ internal class KafkaSource(private val receiver: KafkaReceiver(props) .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } } -- cgit 1.2.3-korg