From f0a4690a93e9ceed517530996f46b2ebfa7348b9 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Wed, 19 Jun 2019 12:59:46 -0700 Subject: Connect to secured Kafka Issue-ID: DCAEGEN2-1632 Change-Id: I85767916fbf3b8f874ac367b7161b50a272d8595 Signed-off-by: Guobiao Mo --- .../datalake/feeder/config/ApplicationConfiguration.java | 4 ++++ .../main/java/org/onap/datalake/feeder/service/Puller.java | 14 ++++++++------ .../feeder/src/main/resources/application.properties | 10 ++++++---- .../java/org/onap/datalake/feeder/service/PullerTest.java | 3 +++ 4 files changed, 21 insertions(+), 10 deletions(-) (limited to 'components/datalake-handler/feeder/src') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 2bcd0a39..fa9f7d98 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -54,6 +54,10 @@ public class ApplicationConfiguration { private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; + private String dmaapKafkaLogin; + private String dmaapKafkaPass; + private String dmaapKafkaSecurityProtocol; + private long dmaapKafkaTimeout; private String[] dmaapKafkaExclude; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java index 9e4ab455..e7121ddb 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java @@ -26,11 +26,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PostConstruct; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -42,8 +43,6 @@ import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Service; /** @@ -90,9 +89,12 @@ public class Puller implements Runnable { consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); - // consumerConfig.put("sasl.mechanism", "PLAIN"); - + if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) { + String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;"; + consumerConfig.put("sasl.jaas.config", jaas); + consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol()); + consumerConfig.put("sasl.mechanism", "PLAIN"); + } return consumerConfig; } diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index faf27583..60fcb1a2 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -26,18 +26,20 @@ spring.datasource.password=dl1234 #####################DMaaP -#dmaapZookeeperHostPort=127.0.0.1:2181 -#dmaapKafkaHostPort=127.0.0.1:9092 dmaapZookeeperHostPort=message-router-zookeeper:2181 dmaapKafkaHostPort=message-router-kafka:9092 dmaapKafkaGroup=dlgroup44 +#dmaapKafkaLogin=admin +#dmaapKafkaPass=admin-secret +#dmaapKafkaSecurityProtocol=SASL_PLAINTEXT + #in second -dmaapKafkaTimeout=60 +dmaapKafkaTimeout=10 dmaapKafkaExclude[0]=__consumer_offsets dmaapKafkaExclude[1]=__transaction_state #dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap #check for new topics , in millisecond -dmaapCheckNewTopicInterval=60000 +dmaapCheckNewTopicInterval=10000 kafkaConsumerCount=3 diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java index fab5d4cd..4a5553fc 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java @@ -73,6 +73,9 @@ public class PullerTest { when(config.getDmaapKafkaHostPort()).thenReturn("test:1000"); when(config.getDmaapKafkaGroup()).thenReturn("test"); + when(config.getDmaapKafkaLogin()).thenReturn("login"); + when(config.getDmaapKafkaPass()).thenReturn("pass"); + when(config.getDmaapKafkaSecurityProtocol()).thenReturn("TEXT"); Thread thread = new Thread(puller); thread.start(); -- cgit 1.2.3-korg