diff options
author | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-05-13 11:58:33 -0700 |
---|---|---|
committer | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-05-13 11:58:33 -0700 |
commit | 3208e0c943742fef5e6692202063dba4e8ab96fd (patch) | |
tree | 4c2249402badfcc430425cd2bd7369b9f1465543 /components/datalake-handler/feeder | |
parent | 59e2cb0714953e91f5a6c29c58fb935f44975442 (diff) |
Support HDFS as a data store
Issue-ID: DCAEGEN2-1498
Change-Id: Id203275bce01bd4a4d6ec131fb9696d78eda82f5
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
19 files changed, 579 insertions, 334 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index 63a2af9a..8c285f84 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -1,192 +1,197 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.dcaegen2.services.components</groupId> - <artifactId>datalake-handler</artifactId> - <version>1.0.0-SNAPSHOT</version> - </parent> - - <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId> - <artifactId>feeder</artifactId> - <packaging>jar</packaging> - <name>DataLake Feeder</name> - - - <dependencies> - - <dependency> - <groupId>org.mariadb.jdbc</groupId> - <artifactId>mariadb-java-client</artifactId> - </dependency> - - <dependency> - <groupId>org.json</groupId> - <artifactId>json</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-web</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-actuator</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-data-jpa</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-data-couchbase</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-configuration-processor</artifactId> - </dependency> - - <dependency> - <groupId>org.elasticsearch.client</groupId> - <artifactId>elasticsearch-rest-high-level-client</artifactId> - </dependency> - - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-yaml</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-xml</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </dependency> - - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - - <dependency> - <groupId>io.druid</groupId> - <artifactId>tranquility-core_2.11</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.velocity</groupId> - <artifactId>velocity-engine-core</artifactId> - </dependency> - - - <dependency> - <groupId>org.hibernate</groupId> - <artifactId>hibernate-core</artifactId> - <version>5.3.7.Final</version> - </dependency> - - <!-- jsr303 validation --> - <dependency> - <groupId>javax.validation</groupId> - <artifactId>validation-api</artifactId> - <version>2.0.1.Final</version> - </dependency> - - <dependency> - <groupId>org.hibernate</groupId> - <artifactId>hibernate-validator</artifactId> - <version>6.0.10.Final</version> - </dependency> - - <dependency> - <groupId>io.springfox</groupId> - <artifactId>springfox-swagger2</artifactId> - <version>2.9.2</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>io.springfox</groupId> - <artifactId>springfox-swagger-ui</artifactId> - <version>2.9.2</version> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - </dependency> - <dependency> - <groupId>com.couchbase.mock</groupId> - <artifactId>CouchbaseMock</artifactId> - <version>1.5.22</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-maven-plugin</artifactId> - <version>${springboot.version}</version> - <executions> - <execution> - <goals> - <goal>repackage</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-failsafe-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.dcaegen2.services.components</groupId> + <artifactId>datalake-handler</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId> + <artifactId>feeder</artifactId> + <packaging>jar</packaging> + <name>DataLake Feeder</name> + + + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + <dependency> + <groupId>org.mariadb.jdbc</groupId> + <artifactId>mariadb-java-client</artifactId> + </dependency> + + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-actuator</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-data-jpa</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-data-couchbase</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-configuration-processor</artifactId> + </dependency> + + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>elasticsearch-rest-high-level-client</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-xml</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.11</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.velocity</groupId> + <artifactId>velocity-engine-core</artifactId> + </dependency> + + + <dependency> + <groupId>org.hibernate</groupId> + <artifactId>hibernate-core</artifactId> + <version>5.3.7.Final</version> + </dependency> + + <!-- jsr303 validation --> + <dependency> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + <version>2.0.1.Final</version> + </dependency> + + <dependency> + <groupId>org.hibernate</groupId> + <artifactId>hibernate-validator</artifactId> + <version>6.0.10.Final</version> + </dependency> + + <dependency> + <groupId>io.springfox</groupId> + <artifactId>springfox-swagger2</artifactId> + <version>2.9.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>io.springfox</groupId> + <artifactId>springfox-swagger-ui</artifactId> + <version>2.9.2</version> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + </dependency> + <dependency> + <groupId>com.couchbase.mock</groupId> + <artifactId>CouchbaseMock</artifactId> + <version>1.5.22</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>${springboot.version}</version> + <executions> + <execution> + <goals> + <goal>repackage</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index 6688d684..04299e6d 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -82,6 +82,7 @@ insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase insert into db (`name`,`host`) values ('Elasticsearch','dl_es');
insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');
insert into db (`name`,`host`) values ('Druid','dl_druid');
+insert into db (`name`,`host`) values ('HDFS','dlhdfs');
-- in production, default enabled should be off
@@ -94,3 +95,4 @@ insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFA insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index d59c0fc1..9106185e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -41,6 +41,16 @@ import lombok.Setter; @EnableAutoConfiguration public class ApplicationConfiguration { + //App general + private boolean async; + private boolean enableSSL; + + private String timestampLabel; + private String rawDataLabel; + + private String defaultTopicName; + + //DMaaP private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; @@ -51,13 +61,10 @@ public class ApplicationConfiguration { private int kafkaConsumerCount; - private boolean async; - private boolean enableSSL; - - private String timestampLabel; - private String rawDataLabel; - - private String defaultTopicName; - private String elasticsearchType; + + //HDFS + private int hdfsBufferSize; + private long hdfsFlushInterval; + private int hdfsBatchSize; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 15ffc8a3..deaa0969 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -60,7 +60,11 @@ public class TopicConfig { } } - + + public boolean supportHdfs() { + return containDb("HDFS"); + } + public boolean supportElasticsearch() { return containDb("Elasticsearch");//TODO string hard codes } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index f5ee5b79..7c237766 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -19,17 +19,16 @@ */ package org.onap.datalake.feeder.service; - + import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; - + import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,9 @@ import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.JsonLongDocument; -import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.env.CouchbaseEnvironment; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import rx.Observable; import rx.functions.Func1; @@ -63,65 +64,69 @@ public class CouchbaseService { @Autowired private DbService dbService; - + Bucket bucket; private boolean isReady = false; @PostConstruct private void init() { - // Initialize Couchbase Connection - try { - Db couchbase = dbService.getCouchbase(); - Cluster cluster = CouchbaseCluster.create(couchbase.getHost()); - cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); - bucket = cluster.openBucket(couchbase.getDatabase()); - log.info("Connect to Couchbase {}", couchbase.getHost()); - // Create a N1QL Primary Index (but ignore if it exists) - bucket.bucketManager().createN1qlPrimaryIndex(true, false); - isReady = true; - } - catch(Exception ex) - { - isReady = false; - } + // Initialize Couchbase Connection + try { + Db couchbase = dbService.getCouchbase(); + + //this tunes the SDK (to customize connection timeout) + CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s + .build(); + Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost()); + cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); + bucket = cluster.openBucket(couchbase.getDatabase()); + // Create a N1QL Primary Index (but ignore if it exists) + bucket.bucketManager().createN1qlPrimaryIndex(true, false); + + log.info("Connected to Couchbase {}", couchbase.getHost()); + isReady = true; + } catch (Exception ex) { + log.error("error connection to Couchbase.", ex); + isReady = false; + } } @PreDestroy - public void cleanUp() { + public void cleanUp() { bucket.close(); - } + } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - List<JsonDocument> documents= new ArrayList<>(jsons.size()); - for(JSONObject json : jsons) { + public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + List<JsonDocument> documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { //convert to Couchbase JsonObject from org.json JSONObject - JsonObject jsonObject = JsonObject.fromJson(json.toString()); + JsonObject jsonObject = JsonObject.fromJson(json.toString()); long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() //setup TTL - int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second - + int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second + String id = getId(topic, json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } try { saveDocuments(documents); - }catch(Exception e) { + } catch (Exception e) { log.error("error saving to Couchbase.", e); } - log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); + log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } public String getId(TopicConfig topic, JSONObject json) { //if this topic requires extract id from JSON String id = topic.getMessageId(json); - if(id != null) { + if (id != null) { return id; } - - String topicStr= topic.getName(); + + String topicStr = topic.getName(); //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 @@ -129,24 +134,19 @@ public class CouchbaseService { // increment by 1, initialize at 0 if counter doc not found //TODO how slow is this compared with above UUID approach? JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - id = topicStr +":"+ nextIdNumber.content(); - + id = topicStr + ":" + nextIdNumber.content(); + return id; } - + //https://docs.couchbase.com/java-sdk/2.7/document-operations.html - private void saveDocuments(List<JsonDocument> documents) { - Observable - .from(documents) - .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { - @Override - public Observable<JsonDocument> call(final JsonDocument docToInsert) { - return bucket.async().insert(docToInsert); - } - }) - .last() - .toBlocking() - .single(); + private void saveDocuments(List<JsonDocument> documents) { + Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { + @Override + public Observable<JsonDocument> call(final JsonDocument docToInsert) { + return bucket.async().insert(docToInsert); + } + }).last().toBlocking().single(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index e859270f..58bb433a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service for Dbs + * Service for Dbs * * @author Guobiao Mo * @@ -38,11 +38,11 @@ public class DbService { @Autowired private DbRepository dbRepository; - + public Db getDb(String name) { Optional<Db> ret = dbRepository.findById(name); return ret.isPresent() ? ret.get() : null; - } + } public Db getCouchbase() { return getDb("Couchbase"); @@ -58,6 +58,10 @@ public class DbService { public Db getDruid() { return getDb("Druid"); - } + } + + public Db getHdfs() { + return getDb("HDFS"); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index de8c9e89..0caec79a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -65,9 +66,7 @@ public class DmaapService { ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); List<String> topics = zk.getChildren("/brokers/topics", false); String[] excludes = config.getDmaapKafkaExclude(); - for (String exclude : excludes) { - topics.remove(exclude); - } + topics.removeAll(Arrays.asList(excludes)); return topics; } catch (Exception e) { log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); @@ -81,7 +80,7 @@ public class DmaapService { return Collections.emptyList(); } - List<String> ret = new ArrayList<>(); + List<String> ret = new ArrayList<>(allTopics.size()); for (String topicStr : allTopics) { TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); if (topicConfig.isEnabled()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index c354f175..f1bed604 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -84,7 +84,7 @@ public class ElasticsearchService { // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); - log.info("Connect to Elasticsearch Host {}", elasticsearchHost); + log.info("Connected to Elasticsearch Host {}", elasticsearchHost); listener = new ActionListener<BulkResponse>() { @Override diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java new file mode 100644 index 00000000..e8d29106 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java @@ -0,0 +1,189 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.dto.TopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.Setter; + +/** + * Service to write data to HDFS + * + * @author Guobiao Mo + * + */ +@Service +public class HdfsService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + ApplicationConfiguration config; + + @Autowired + private DbService dbService; + + FileSystem fileSystem; + private boolean isReady = false; + + private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new); + private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); + private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); + + @Setter + @Getter + private class Buffer { + long lastFlush; + List<String> data; + + public Buffer() { + lastFlush = Long.MIN_VALUE; + data = new ArrayList<>(); + } + + public void flush(String topic) { + try { + saveMessages(topic, data); + data.clear(); + lastFlush = System.currentTimeMillis(); + log.debug("done flush, topic={}, buffer size={}", topic, data.size()); + } catch (IOException e) { + log.error("error saving to HDFS." + topic, e); + } + } + + public void flushStall(String topic) { + if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) { + log.debug("going to flushStall topic={}, buffer size={}", topic, data.size()); + flush(topic); + } + } + + private void saveMessages(String topic, List<String> bufferList) throws IOException { + + String thread = Thread.currentThread().getName(); + Date date = new Date(); + String day = dayFormat.get().format(date); + String time = timeFormat.get().format(date); + String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread); + Path path = new Path(filePath); + log.debug("writing to HDFS {}", filePath); + + // Create a new file and write data to it. + FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize()); + + bufferList.stream().forEach(message -> { + try { + out.writeUTF(message); + out.write('\n'); + } catch (IOException e) { + log.error("error writing to HDFS.", e); + } + }); + + out.close(); + } + } + + @PostConstruct + private void init() { + // Initialize HDFS Connection + try { + Db hdfs = dbService.getHdfs(); + + //Get configuration of Hadoop system + Configuration hdfsConfig = new Configuration(); + + int port = hdfs.getPort() == null ? 8020 : hdfs.getPort(); + + String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port); + hdfsConfig.set("fs.defaultFS", hdfsuri); + + log.info("Connecting to -- {}", hdfsuri); + + fileSystem = FileSystem.get(hdfsConfig); + + isReady = true; + } catch (Exception ex) { + log.error("error connection to HDFS.", ex); + isReady = false; + } + } + + @PreDestroy + public void cleanUp() { + try { + flush(); + fileSystem.close(); + } catch (IOException e) { + log.error("fileSystem.close() at cleanUp.", e); + } + } + + public void flush() { + bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); + } + + //if no new data comes in for a topic for a while, need to flush its buffer + public void flushStall() { + bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); + } + + public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) { + String topicStr = topic.getName(); + + Map<String, Buffer> bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + List<String> bufferData = buffer.getData(); + + messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used + + if (bufferData.size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index c5408951..32d21c62 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -135,7 +135,7 @@ public class MongodbService { } public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - if (dbReady == false) + if (dbReady == false)//TOD throw exception return; List<Document> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index b3a6d29a..1154b3a9 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -54,7 +54,7 @@ import org.springframework.stereotype.Service; */ @Service -@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class PullThread implements Runnable { @Autowired @@ -90,7 +90,8 @@ public class PullThread implements Runnable { Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -119,28 +120,29 @@ public class PullThread implements Runnable { while (active.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); - if (records != null) { - List<Pair<Long, String>> messages = new ArrayList<>(records.count()); - for (TopicPartition partition : records.partitions()) { - messages.clear(); - List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); - for (ConsumerRecord<String, String> record : partitionRecords) { - messages.add(Pair.of(record.timestamp(), record.value())); - //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); - } - storeService.saveMessages(partition.topic(), messages); - log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit - long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); - } - } - - if (async) {//for high Throughput, async commit offset in batch to Kafka - consumer.commitAsync(); - } - } + if (records != null) { + List<Pair<Long, String>> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } + storeService.flushStall(); } } catch (Exception e) { log.error("Puller {} run(): exception={}", id, e.getMessage()); @@ -153,6 +155,7 @@ public class PullThread implements Runnable { public void shutdown() { active.set(false); consumer.wakeup(); + consumer.unsubscribe(); } private class DummyRebalanceListener implements ConsumerRebalanceListener { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 449dacfc..2d00a9b8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import org.apache.commons.lang3.tuple.Pair; @@ -35,7 +34,6 @@ import org.json.JSONException; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; import org.slf4j.Logger; @@ -77,6 +75,9 @@ public class StoreService { @Autowired private ElasticsearchService elasticsearchService; + @Autowired + private HdfsService hdfsService; + private Map<String, TopicConfig> topicMap = new HashMap<>(); private ObjectMapper yamlReader; @@ -86,10 +87,6 @@ public class StoreService { yamlReader = new ObjectMapper(new YAMLFactory()); } - @PreDestroy - public void cleanUp() { - } - public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text if (messages == null || messages.isEmpty()) { return; @@ -109,7 +106,7 @@ public class StoreService { } } - saveJsons(topic, docs); + saveJsons(topic, docs, messages); } private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException { @@ -161,7 +158,7 @@ public class StoreService { return json; } - private void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) { if (topic.supportMongoDB()) { mongodbService.saveJsons(topic, jsons); } @@ -173,6 +170,13 @@ public class StoreService { if (topic.supportElasticsearch()) { elasticsearchService.saveJsons(topic, jsons); } + + if (topic.supportHdfs()) { + hdfsService.saveMessages(topic, messages); + } } + public void flushStall() { + hdfsService.flushStall(); + } } diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index e1b83999..b9d6b9e9 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -1,8 +1,20 @@ - +#####################App general server.port = 1680 server.servlet.context-path = /datalake/v1 -# Spring connection to MariaDB for ORM +#tolerate inconsistency when system crash, see PullThread.run() +async=true + +#SSL global flag, if enabled, still need to check each individual DB SSL flag +enableSSL=false + +#names for extra fields that DL adds to each record +timestampLabel=datalake_ts_ +rawDataLabel=datalake_text_ + +defaultTopicName=_DL_DEFAULT_ + +#####################Spring connection to MariaDB for ORM #spring.jpa.hibernate.ddl-auto=update spring.jpa.hibernate.ddl-auto=none spring.jpa.show-sql=false @@ -13,42 +25,32 @@ spring.datasource.username=dl spring.datasource.password=dl1234 -#For Beijing lab -#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 -#dmaapKafkaHostPort=kafka.mr01.onap.vip:80 -#spring.couchbase.bootstrap-hosts=172.30.1.74 -#couchbaseHost=172.30.1.74 - - -#DMaaP +#####################DMaaP #dmaapZookeeperHostPort=127.0.0.1:2181 #dmaapKafkaHostPort=127.0.0.1:9092 dmaapZookeeperHostPort=message-router-zookeeper:2181 dmaapKafkaHostPort=message-router-kafka:9092 -dmaapKafkaGroup=dlgroup10 +dmaapKafkaGroup=dlgroup19 +#in second dmaapKafkaTimeout=60 dmaapKafkaExclude[0]=__consumer_offsets -dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap +dmaapKafkaExclude[1]=__transaction_state +dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap #check for new topics dmaapCheckNewTopicIntervalInSec=3000 -kafkaConsumerCount=1 - -#tolerate inconsistency when system crash, see PullThread.run() -async=true - -#SSL global flag, if enabled, still need to check each individual DB SSL flag -enableSSL=false +kafkaConsumerCount=3 -#names for extra fields that DL adds to each record -timestampLabel=datalake_ts_ -rawDataLabel=datalake_text_ +#####################Elasticsearch +elasticsearchType=doc -defaultTopicName=_DL_DEFAULT_ +#####################HDFS +hdfsBufferSize=4096 +#how often we flush stall updates, in millisecond +hdfsFlushInterval=10000 +hdfsBatchSize=250 -elasticsearchType=doc - -#Logging +#####################Logging logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG diff --git a/components/datalake-handler/feeder/src/main/resources/logback.xml b/components/datalake-handler/feeder/src/main/resources/logback.xml index 28fbafcd..320e9db8 100644 --- a/components/datalake-handler/feeder/src/main/resources/logback.xml +++ b/components/datalake-handler/feeder/src/main/resources/logback.xml @@ -9,7 +9,7 @@ </layout>
</appender>
- <logger name="com.mkyong.web" level="debug"
+ <logger name="org.onap.datalake" level="debug"
additivity="false">
<appender-ref ref="STDOUT" />
</logger>
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java index c4a5b1bf..7243a8e6 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -26,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -60,12 +59,20 @@ public class ApplicationConfigurationTest { assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0); assertTrue(config.getKafkaConsumerCount() > 0); + + assertNotNull(config.getDmaapKafkaExclude()); + assertNotNull(config.isAsync()); assertNotNull(config.isEnableSSL()); assertNotNull(config.getDefaultTopicName()); assertNotNull(config.getRawDataLabel()); assertNotNull(config.getTimestampLabel()); - assertEquals(null, config.getElasticsearchType()); + assertNotNull(config.getElasticsearchType()); + + //HDFS + assertTrue(config.getHdfsBatchSize()>0); + assertTrue(config.getHdfsBufferSize()>0); + assertTrue(config.getHdfsFlushInterval()>0); } } diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java index dc9feedc..bb31cd74 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java @@ -99,6 +99,7 @@ public class TopicConfigTest { assertFalse(testTopicConfig.supportCouchbase()); assertFalse(testTopicConfig.supportDruid()); assertFalse(testTopicConfig.supportMongoDB()); + assertFalse(testTopicConfig.supportHdfs()); testTopic.getDbs().remove(new Db("Elasticsearch")); testTopicConfig = testTopic.getTopicConfig(); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java index 4948001f..8aa60abc 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/DbServiceTest.java @@ -91,4 +91,11 @@ public class DbServiceTest { assertEquals(dbService.getDruid(), new Db(name)); } + @Test + public void testGetHdfs() { + String name = "HDFS"; + when(dbRepository.findById(name)).thenReturn(Optional.of(new Db(name))); + assertEquals(dbService.getHdfs(), new Db(name)); + } + } diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java index 020bcc55..8519bfbb 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullServiceTest.java @@ -46,8 +46,6 @@ public class PullServiceTest { @Mock private ApplicationConfiguration config; - private boolean isRunning = false; - @Mock private ExecutorService executorService; diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties index b9077056..75a26187 100644 --- a/components/datalake-handler/feeder/src/test/resources/application.properties +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -1,35 +1,48 @@ - +#####################App general server.port = 1680 +server.servlet.context-path = /datalake/v1 + +#tolerate inconsistency when system crash, see PullThread.run() +async=true + +#SSL global flag, if enabled, still need to check each individual DB SSL flag +enableSSL=false + +#names for extra fields that DL adds to each record +timestampLabel=datalake_ts_ +rawDataLabel=datalake_text_ + +defaultTopicName=_DL_DEFAULT_ -#DMaaP + +#####################DMaaP #dmaapZookeeperHostPort=127.0.0.1:2181 #dmaapKafkaHostPort=127.0.0.1:9092 dmaapZookeeperHostPort=message-router-zookeeper:2181 dmaapKafkaHostPort=message-router-kafka:9092 -dmaapKafkaGroup=dlgroup10 +dmaapKafkaGroup=dlgroup19 +#in second dmaapKafkaTimeout=60 +dmaapKafkaExclude[0]=__consumer_offsets +dmaapKafkaExclude[1]=__transaction_state +dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap #check for new topics dmaapCheckNewTopicIntervalInSec=3000 kafkaConsumerCount=1 -#tolerate inconsistency when system crash, see PullThread.run() -async=true - -#SSL global flag, if enabled, still need to check each individual DB SSL flag -enableSSL=false - -#names for extra fields that DL adds to each record -timestampLabel=datalake_ts_ -rawDataLabel=datalake_text_ +#####################Elasticsearch +elasticsearchType=doc -defaultTopicName=_DL_DEFAULT_ +#####################HDFS +hdfsBufferSize=4096 +#how often we flush stall updates, in millisecond +hdfsFlushInterval=10000 +hdfsBatchSize=250 - -#Logging +#####################Logging logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG - |