summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-05-13 11:58:33 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-05-13 11:58:33 -0700
commit3208e0c943742fef5e6692202063dba4e8ab96fd (patch)
tree4c2249402badfcc430425cd2bd7369b9f1465543 /components/datalake-handler/feeder
parent59e2cb0714953e91f5a6c29c58fb935f44975442 (diff)
Support HDFS as a data store
Issue-ID: DCAEGEN2-1498 Change-Id: Id203275bce01bd4a4d6ec131fb9696d78eda82f5 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
-rw-r--r--components/datalake-handler/feeder/pom.xml381
-rw-r--r--components/datalake-handler/feeder/src/assembly/scripts/init_db.sql2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java96
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java189
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java51
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java20
-rw-r--r--components/datalake-handler/feeder/src/main/resources/application.properties54
-rw-r--r--components/datalake-handler/feeder/src/main/resources/logback.xml2
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java11
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java1
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java7
-rw-r--r--components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java2
-rw-r--r--components/datalake-handler/feeder/src/test/resources/application.properties45
19 files changed, 579 insertions, 334 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml
index 63a2af9a..8c285f84 100644
--- a/components/datalake-handler/feeder/pom.xml
+++ b/components/datalake-handler/feeder/pom.xml
@@ -1,192 +1,197 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.onap.dcaegen2.services.components</groupId>
- <artifactId>datalake-handler</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </parent>
-
- <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
- <artifactId>feeder</artifactId>
- <packaging>jar</packaging>
- <name>DataLake Feeder</name>
-
-
- <dependencies>
-
- <dependency>
- <groupId>org.mariadb.jdbc</groupId>
- <artifactId>mariadb-java-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-couchbase</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.druid</groupId>
- <artifactId>tranquility-core_2.11</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.velocity</groupId>
- <artifactId>velocity-engine-core</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>org.hibernate</groupId>
- <artifactId>hibernate-core</artifactId>
- <version>5.3.7.Final</version>
- </dependency>
-
- <!-- jsr303 validation -->
- <dependency>
- <groupId>javax.validation</groupId>
- <artifactId>validation-api</artifactId>
- <version>2.0.1.Final</version>
- </dependency>
-
- <dependency>
- <groupId>org.hibernate</groupId>
- <artifactId>hibernate-validator</artifactId>
- <version>6.0.10.Final</version>
- </dependency>
-
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>2.9.2</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger-ui</artifactId>
- <version>2.9.2</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- </dependency>
- <dependency>
- <groupId>com.couchbase.mock</groupId>
- <artifactId>CouchbaseMock</artifactId>
- <version>1.5.22</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>${springboot.version}</version>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.services.components</groupId>
+ <artifactId>datalake-handler</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
+ <artifactId>feeder</artifactId>
+ <packaging>jar</packaging>
+ <name>DataLake Feeder</name>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-couchbase</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.druid</groupId>
+ <artifactId>tranquility-core_2.11</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.velocity</groupId>
+ <artifactId>velocity-engine-core</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.hibernate</groupId>
+ <artifactId>hibernate-core</artifactId>
+ <version>5.3.7.Final</version>
+ </dependency>
+
+ <!-- jsr303 validation -->
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <version>2.0.1.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hibernate</groupId>
+ <artifactId>hibernate-validator</artifactId>
+ <version>6.0.10.Final</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>2.9.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>2.9.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.couchbase.mock</groupId>
+ <artifactId>CouchbaseMock</artifactId>
+ <version>1.5.22</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${springboot.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index 6688d684..04299e6d 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -82,6 +82,7 @@ insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase
insert into db (`name`,`host`) values ('Elasticsearch','dl_es');
insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');
insert into db (`name`,`host`) values ('Druid','dl_druid');
+insert into db (`name`,`host`) values ('HDFS','dlhdfs');
-- in production, default enabled should be off
@@ -94,3 +95,4 @@ insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFA
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index d59c0fc1..9106185e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -41,6 +41,16 @@ import lombok.Setter;
@EnableAutoConfiguration
public class ApplicationConfiguration {
+ //App general
+ private boolean async;
+ private boolean enableSSL;
+
+ private String timestampLabel;
+ private String rawDataLabel;
+
+ private String defaultTopicName;
+
+ //DMaaP
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
@@ -51,13 +61,10 @@ public class ApplicationConfiguration {
private int kafkaConsumerCount;
- private boolean async;
- private boolean enableSSL;
-
- private String timestampLabel;
- private String rawDataLabel;
-
- private String defaultTopicName;
-
private String elasticsearchType;
+
+ //HDFS
+ private int hdfsBufferSize;
+ private long hdfsFlushInterval;
+ private int hdfsBatchSize;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 15ffc8a3..deaa0969 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -60,7 +60,11 @@ public class TopicConfig {
}
}
-
+
+ public boolean supportHdfs() {
+ return containDb("HDFS");
+ }
+
public boolean supportElasticsearch() {
return containDb("Elasticsearch");//TODO string hard codes
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index f5ee5b79..7c237766 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -19,17 +19,16 @@
*/
package org.onap.datalake.feeder.service;
-
+
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-
+
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,9 @@ import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.JsonLongDocument;
-import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import rx.Observable;
import rx.functions.Func1;
@@ -63,65 +64,69 @@ public class CouchbaseService {
@Autowired
private DbService dbService;
-
+
Bucket bucket;
private boolean isReady = false;
@PostConstruct
private void init() {
- // Initialize Couchbase Connection
- try {
- Db couchbase = dbService.getCouchbase();
- Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
- cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
- bucket = cluster.openBucket(couchbase.getDatabase());
- log.info("Connect to Couchbase {}", couchbase.getHost());
- // Create a N1QL Primary Index (but ignore if it exists)
- bucket.bucketManager().createN1qlPrimaryIndex(true, false);
- isReady = true;
- }
- catch(Exception ex)
- {
- isReady = false;
- }
+ // Initialize Couchbase Connection
+ try {
+ Db couchbase = dbService.getCouchbase();
+
+ //this tunes the SDK (to customize connection timeout)
+ CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s
+ .build();
+ Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost());
+ cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+ bucket = cluster.openBucket(couchbase.getDatabase());
+ // Create a N1QL Primary Index (but ignore if it exists)
+ bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+
+ log.info("Connected to Couchbase {}", couchbase.getHost());
+ isReady = true;
+ } catch (Exception ex) {
+ log.error("error connection to Couchbase.", ex);
+ isReady = false;
+ }
}
@PreDestroy
- public void cleanUp() {
+ public void cleanUp() {
bucket.close();
- }
+ }
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- List<JsonDocument> documents= new ArrayList<>(jsons.size());
- for(JSONObject json : jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ List<JsonDocument> documents = new ArrayList<>(jsons.size());
+ for (JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
- JsonObject jsonObject = JsonObject.fromJson(json.toString());
+ JsonObject jsonObject = JsonObject.fromJson(json.toString());
long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
- int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
-
+ int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second
+
String id = getId(topic, json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
try {
saveDocuments(documents);
- }catch(Exception e) {
+ } catch (Exception e) {
log.error("error saving to Couchbase.", e);
}
- log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
+ log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
public String getId(TopicConfig topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
- if(id != null) {
+ if (id != null) {
return id;
}
-
- String topicStr= topic.getName();
+
+ String topicStr = topic.getName();
//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -129,24 +134,19 @@ public class CouchbaseService {
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- id = topicStr +":"+ nextIdNumber.content();
-
+ id = topicStr + ":" + nextIdNumber.content();
+
return id;
}
-
+
//https://docs.couchbase.com/java-sdk/2.7/document-operations.html
- private void saveDocuments(List<JsonDocument> documents) {
- Observable
- .from(documents)
- .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
- @Override
- public Observable<JsonDocument> call(final JsonDocument docToInsert) {
- return bucket.async().insert(docToInsert);
- }
- })
- .last()
- .toBlocking()
- .single();
+ private void saveDocuments(List<JsonDocument> documents) {
+ Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
+ @Override
+ public Observable<JsonDocument> call(final JsonDocument docToInsert) {
+ return bucket.async().insert(docToInsert);
+ }
+ }).last().toBlocking().single();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index e859270f..58bb433a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service for Dbs
+ * Service for Dbs
*
* @author Guobiao Mo
*
@@ -38,11 +38,11 @@ public class DbService {
@Autowired
private DbRepository dbRepository;
-
+
public Db getDb(String name) {
Optional<Db> ret = dbRepository.findById(name);
return ret.isPresent() ? ret.get() : null;
- }
+ }
public Db getCouchbase() {
return getDb("Couchbase");
@@ -58,6 +58,10 @@ public class DbService {
public Db getDruid() {
return getDb("Druid");
- }
+ }
+
+ public Db getHdfs() {
+ return getDb("HDFS");
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index de8c9e89..0caec79a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -65,9 +66,7 @@ public class DmaapService {
ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
List<String> topics = zk.getChildren("/brokers/topics", false);
String[] excludes = config.getDmaapKafkaExclude();
- for (String exclude : excludes) {
- topics.remove(exclude);
- }
+ topics.removeAll(Arrays.asList(excludes));
return topics;
} catch (Exception e) {
log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
@@ -81,7 +80,7 @@ public class DmaapService {
return Collections.emptyList();
}
- List<String> ret = new ArrayList<>();
+ List<String> ret = new ArrayList<>(allTopics.size());
for (String topicStr : allTopics) {
TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
if (topicConfig.isEnabled()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index c354f175..f1bed604 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -84,7 +84,7 @@ public class ElasticsearchService {
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
- log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
+ log.info("Connected to Elasticsearch Host {}", elasticsearchHost);
listener = new ActionListener<BulkResponse>() {
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
new file mode 100644
index 00000000..e8d29106
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
@@ -0,0 +1,189 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.dto.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Service to write data to HDFS
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class HdfsService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
+
+ FileSystem fileSystem;
+ private boolean isReady = false;
+
+ private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
+ private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
+ private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
+
+ @Setter
+ @Getter
+ private class Buffer {
+ long lastFlush;
+ List<String> data;
+
+ public Buffer() {
+ lastFlush = Long.MIN_VALUE;
+ data = new ArrayList<>();
+ }
+
+ public void flush(String topic) {
+ try {
+ saveMessages(topic, data);
+ data.clear();
+ lastFlush = System.currentTimeMillis();
+ log.debug("done flush, topic={}, buffer size={}", topic, data.size());
+ } catch (IOException e) {
+ log.error("error saving to HDFS." + topic, e);
+ }
+ }
+
+ public void flushStall(String topic) {
+ if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) {
+ log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
+ flush(topic);
+ }
+ }
+
+ private void saveMessages(String topic, List<String> bufferList) throws IOException {
+
+ String thread = Thread.currentThread().getName();
+ Date date = new Date();
+ String day = dayFormat.get().format(date);
+ String time = timeFormat.get().format(date);
+ String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread);
+ Path path = new Path(filePath);
+ log.debug("writing to HDFS {}", filePath);
+
+ // Create a new file and write data to it.
+ FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
+
+ bufferList.stream().forEach(message -> {
+ try {
+ out.writeUTF(message);
+ out.write('\n');
+ } catch (IOException e) {
+ log.error("error writing to HDFS.", e);
+ }
+ });
+
+ out.close();
+ }
+ }
+
+ @PostConstruct
+ private void init() {
+ // Initialize HDFS Connection
+ try {
+ Db hdfs = dbService.getHdfs();
+
+ //Get configuration of Hadoop system
+ Configuration hdfsConfig = new Configuration();
+
+ int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
+
+ String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
+ hdfsConfig.set("fs.defaultFS", hdfsuri);
+
+ log.info("Connecting to -- {}", hdfsuri);
+
+ fileSystem = FileSystem.get(hdfsConfig);
+
+ isReady = true;
+ } catch (Exception ex) {
+ log.error("error connection to HDFS.", ex);
+ isReady = false;
+ }
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ try {
+ flush();
+ fileSystem.close();
+ } catch (IOException e) {
+ log.error("fileSystem.close() at cleanUp.", e);
+ }
+ }
+
+ public void flush() {
+ bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
+ }
+
+ //if no new data comes in for a topic for a while, need to flush its buffer
+ public void flushStall() {
+ bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
+ }
+
+ public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) {
+ String topicStr = topic.getName();
+
+ Map<String, Buffer> bufferMap = bufferLocal.get();
+ final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+ List<String> bufferData = buffer.getData();
+
+ messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used
+
+ if (bufferData.size() >= config.getHdfsBatchSize()) {
+ buffer.flush(topicStr);
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index c5408951..32d21c62 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -135,7 +135,7 @@ public class MongodbService {
}
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- if (dbReady == false)
+ if (dbReady == false)//TOD throw exception
return;
List<Document> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index b3a6d29a..1154b3a9 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
*/
@Service
-@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PullThread implements Runnable {
@Autowired
@@ -90,7 +90,8 @@ public class PullThread implements Runnable {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -119,28 +120,29 @@ public class PullThread implements Runnable {
while (active.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
- if (records != null) {
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
- }
+ if (records != null) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
+ storeService.flushStall();
}
} catch (Exception e) {
log.error("Puller {} run(): exception={}", id, e.getMessage());
@@ -153,6 +155,7 @@ public class PullThread implements Runnable {
public void shutdown() {
active.set(false);
consumer.wakeup();
+ consumer.unsubscribe();
}
private class DummyRebalanceListener implements ConsumerRebalanceListener {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 449dacfc..2d00a9b8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import org.apache.commons.lang3.tuple.Pair;
@@ -35,7 +34,6 @@ import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
@@ -77,6 +75,9 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
+ @Autowired
+ private HdfsService hdfsService;
+
private Map<String, TopicConfig> topicMap = new HashMap<>();
private ObjectMapper yamlReader;
@@ -86,10 +87,6 @@ public class StoreService {
yamlReader = new ObjectMapper(new YAMLFactory());
}
- @PreDestroy
- public void cleanUp() {
- }
-
public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
if (messages == null || messages.isEmpty()) {
return;
@@ -109,7 +106,7 @@ public class StoreService {
}
}
- saveJsons(topic, docs);
+ saveJsons(topic, docs, messages);
}
private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
@@ -161,7 +158,7 @@ public class StoreService {
return json;
}
- private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
if (topic.supportMongoDB()) {
mongodbService.saveJsons(topic, jsons);
}
@@ -173,6 +170,13 @@ public class StoreService {
if (topic.supportElasticsearch()) {
elasticsearchService.saveJsons(topic, jsons);
}
+
+ if (topic.supportHdfs()) {
+ hdfsService.saveMessages(topic, messages);
+ }
}
+ public void flushStall() {
+ hdfsService.flushStall();
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index e1b83999..b9d6b9e9 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -1,8 +1,20 @@
-
+#####################App general
server.port = 1680
server.servlet.context-path = /datalake/v1
-# Spring connection to MariaDB for ORM
+#tolerate inconsistency when system crash, see PullThread.run()
+async=true
+
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+#####################Spring connection to MariaDB for ORM
#spring.jpa.hibernate.ddl-auto=update
spring.jpa.hibernate.ddl-auto=none
spring.jpa.show-sql=false
@@ -13,42 +25,32 @@ spring.datasource.username=dl
spring.datasource.password=dl1234
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
-
-
-#DMaaP
+#####################DMaaP
#dmaapZookeeperHostPort=127.0.0.1:2181
#dmaapKafkaHostPort=127.0.0.1:9092
dmaapZookeeperHostPort=message-router-zookeeper:2181
dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup10
+dmaapKafkaGroup=dlgroup19
+#in second
dmaapKafkaTimeout=60
dmaapKafkaExclude[0]=__consumer_offsets
-dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap
+dmaapKafkaExclude[1]=__transaction_state
+dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
#check for new topics
dmaapCheckNewTopicIntervalInSec=3000
-kafkaConsumerCount=1
-
-#tolerate inconsistency when system crash, see PullThread.run()
-async=true
-
-#SSL global flag, if enabled, still need to check each individual DB SSL flag
-enableSSL=false
+kafkaConsumerCount=3
-#names for extra fields that DL adds to each record
-timestampLabel=datalake_ts_
-rawDataLabel=datalake_text_
+#####################Elasticsearch
+elasticsearchType=doc
-defaultTopicName=_DL_DEFAULT_
+#####################HDFS
+hdfsBufferSize=4096
+#how often we flush stall updates, in millisecond
+hdfsFlushInterval=10000
+hdfsBatchSize=250
-elasticsearchType=doc
-
-#Logging
+#####################Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
diff --git a/components/datalake-handler/feeder/src/main/resources/logback.xml b/components/datalake-handler/feeder/src/main/resources/logback.xml
index 28fbafcd..320e9db8 100644
--- a/components/datalake-handler/feeder/src/main/resources/logback.xml
+++ b/components/datalake-handler/feeder/src/main/resources/logback.xml
@@ -9,7 +9,7 @@
</layout>
</appender>
- <logger name="com.mkyong.web" level="debug"
+ <logger name="org.onap.datalake" level="debug"
additivity="false">
<appender-ref ref="STDOUT" />
</logger>
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index c4a5b1bf..7243a8e6 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -60,12 +59,20 @@ public class ApplicationConfigurationTest {
assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0);
assertTrue(config.getKafkaConsumerCount() > 0);
+
+ assertNotNull(config.getDmaapKafkaExclude());
+
assertNotNull(config.isAsync());
assertNotNull(config.isEnableSSL());
assertNotNull(config.getDefaultTopicName());
assertNotNull(config.getRawDataLabel());
assertNotNull(config.getTimestampLabel());
- assertEquals(null, config.getElasticsearchType());
+ assertNotNull(config.getElasticsearchType());
+
+ //HDFS
+ assertTrue(config.getHdfsBatchSize()>0);
+ assertTrue(config.getHdfsBufferSize()>0);
+ assertTrue(config.getHdfsFlushInterval()>0);
}
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
index dc9feedc..bb31cd74 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
@@ -99,6 +99,7 @@ public class TopicConfigTest {
assertFalse(testTopicConfig.supportCouchbase());
assertFalse(testTopicConfig.supportDruid());
assertFalse(testTopicConfig.supportMongoDB());
+ assertFalse(testTopicConfig.supportHdfs());
testTopic.getDbs().remove(new Db("Elasticsearch"));
testTopicConfig = testTopic.getTopicConfig();
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java
index 4948001f..8aa60abc 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java
@@ -91,4 +91,11 @@ public class DbServiceTest {
assertEquals(dbService.getDruid(), new Db(name));
}
+ @Test
+ public void testGetHdfs() {
+ String name = "HDFS";
+ when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name)));
+ assertEquals(dbService.getHdfs(), new Db(name));
+ }
+
}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java
index 020bcc55..8519bfbb 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java
@@ -46,8 +46,6 @@ public class PullServiceTest {
@Mock
private ApplicationConfiguration config;
- private boolean isRunning = false;
-
@Mock
private ExecutorService executorService;
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index b9077056..75a26187 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -1,35 +1,48 @@
-
+#####################App general
server.port = 1680
+server.servlet.context-path = /datalake/v1
+
+#tolerate inconsistency when system crash, see PullThread.run()
+async=true
+
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
-#DMaaP
+
+#####################DMaaP
#dmaapZookeeperHostPort=127.0.0.1:2181
#dmaapKafkaHostPort=127.0.0.1:9092
dmaapZookeeperHostPort=message-router-zookeeper:2181
dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup10
+dmaapKafkaGroup=dlgroup19
+#in second
dmaapKafkaTimeout=60
+dmaapKafkaExclude[0]=__consumer_offsets
+dmaapKafkaExclude[1]=__transaction_state
+dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
#check for new topics
dmaapCheckNewTopicIntervalInSec=3000
kafkaConsumerCount=1
-#tolerate inconsistency when system crash, see PullThread.run()
-async=true
-
-#SSL global flag, if enabled, still need to check each individual DB SSL flag
-enableSSL=false
-
-#names for extra fields that DL adds to each record
-timestampLabel=datalake_ts_
-rawDataLabel=datalake_text_
+#####################Elasticsearch
+elasticsearchType=doc
-defaultTopicName=_DL_DEFAULT_
+#####################HDFS
+hdfsBufferSize=4096
+#how often we flush stall updates, in millisecond
+hdfsFlushInterval=10000
+hdfsBatchSize=250
-
-#Logging
+#####################Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
-