summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-06-19 12:59:46 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-06-19 12:59:46 -0700
commitf0a4690a93e9ceed517530996f46b2ebfa7348b9 (patch)
treed348ebd713163be281020edbc74ad551d2b3b84c /components/datalake-handler/feeder
parentef09439c31588547a1161365c8ff15677a1376c0 (diff)
Connect to secured Kafka
Issue-ID: DCAEGEN2-1632 Change-Id: I85767916fbf3b8f874ac367b7161b50a272d8595 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java4
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java14
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties10
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java3
4 files changed, 21 insertions, 10 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 2bcd0a39..fa9f7d98 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -54,6 +54,10 @@ public class ApplicationConfiguration {
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
+ private String dmaapKafkaLogin;
+ private String dmaapKafkaPass;
+ private String dmaapKafkaSecurityProtocol;
+
private long dmaapKafkaTimeout;
private String[] dmaapKafkaExclude;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 9e4ab455..e7121ddb 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -26,11 +26,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -42,8 +43,6 @@ import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
/**
@@ -90,9 +89,12 @@ public class Puller implements Runnable {
consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- // consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
- // consumerConfig.put("sasl.mechanism", "PLAIN");
-
+ if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) {
+ String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;";
+ consumerConfig.put("sasl.jaas.config", jaas);
+ consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol());
+ consumerConfig.put("sasl.mechanism", "PLAIN");
+ }
return consumerConfig;
}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index faf27583..60fcb1a2 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -26,18 +26,20 @@ spring.datasource.password=dl1234
#####################DMaaP
-#dmaapZookeeperHostPort=127.0.0.1:2181
-#dmaapKafkaHostPort=127.0.0.1:9092
dmaapZookeeperHostPort=message-router-zookeeper:2181
dmaapKafkaHostPort=message-router-kafka:9092
dmaapKafkaGroup=dlgroup44
+#dmaapKafkaLogin=admin
+#dmaapKafkaPass=admin-secret
+#dmaapKafkaSecurityProtocol=SASL_PLAINTEXT
+
#in second
-dmaapKafkaTimeout=60
+dmaapKafkaTimeout=10
dmaapKafkaExclude[0]=__consumer_offsets
dmaapKafkaExclude[1]=__transaction_state
#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
#check for new topics , in millisecond
-dmaapCheckNewTopicInterval=60000
+dmaapCheckNewTopicInterval=10000
kafkaConsumerCount=3
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
index fab5d4cd..4a5553fc 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
@@ -73,6 +73,9 @@ public class PullerTest {
when(config.getDmaapKafkaHostPort()).thenReturn("test:1000");
when(config.getDmaapKafkaGroup()).thenReturn("test");
+ when(config.getDmaapKafkaLogin()).thenReturn("login");
+ when(config.getDmaapKafkaPass()).thenReturn("pass");
+ when(config.getDmaapKafkaSecurityProtocol()).thenReturn("TEXT");
Thread thread = new Thread(puller);
thread.start();