summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-05-13 11:58:33 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-05-13 11:58:33 -0700
commit3208e0c943742fef5e6692202063dba4e8ab96fd (patch)
tree4c2249402badfcc430425cd2bd7369b9f1465543 /components/datalake-handler/feeder/src/main/java/org
parent59e2cb0714953e91f5a6c29c58fb935f44975442 (diff)
Support HDFS as a data store
Issue-ID: DCAEGEN2-1498 Change-Id: Id203275bce01bd4a4d6ec131fb9696d78eda82f5 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java6
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java96
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java7
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java189
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java51
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java20
10 files changed, 309 insertions, 99 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index d59c0fc1..9106185e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -41,6 +41,16 @@ import lombok.Setter;
@EnableAutoConfiguration
public class ApplicationConfiguration {
+ //App general
+ private boolean async;
+ private boolean enableSSL;
+
+ private String timestampLabel;
+ private String rawDataLabel;
+
+ private String defaultTopicName;
+
+ //DMaaP
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
@@ -51,13 +61,10 @@ public class ApplicationConfiguration {
private int kafkaConsumerCount;
- private boolean async;
- private boolean enableSSL;
-
- private String timestampLabel;
- private String rawDataLabel;
-
- private String defaultTopicName;
-
private String elasticsearchType;
+
+ //HDFS
+ private int hdfsBufferSize;
+ private long hdfsFlushInterval;
+ private int hdfsBatchSize;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index 15ffc8a3..deaa0969 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -60,7 +60,11 @@ public class TopicConfig {
}
}
-
+
+ public boolean supportHdfs() {
+ return containDb("HDFS");
+ }
+
public boolean supportElasticsearch() {
return containDb("Elasticsearch");//TODO string hard codes
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
index f5ee5b79..7c237766 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -19,17 +19,16 @@
*/
package org.onap.datalake.feeder.service;
-
+
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-
+
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,9 @@ import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.JsonLongDocument;
-import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.document.json.JsonObject;
+import com.couchbase.client.java.env.CouchbaseEnvironment;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import rx.Observable;
import rx.functions.Func1;
@@ -63,65 +64,69 @@ public class CouchbaseService {
@Autowired
private DbService dbService;
-
+
Bucket bucket;
private boolean isReady = false;
@PostConstruct
private void init() {
- // Initialize Couchbase Connection
- try {
- Db couchbase = dbService.getCouchbase();
- Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
- cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
- bucket = cluster.openBucket(couchbase.getDatabase());
- log.info("Connect to Couchbase {}", couchbase.getHost());
- // Create a N1QL Primary Index (but ignore if it exists)
- bucket.bucketManager().createN1qlPrimaryIndex(true, false);
- isReady = true;
- }
- catch(Exception ex)
- {
- isReady = false;
- }
+ // Initialize Couchbase Connection
+ try {
+ Db couchbase = dbService.getCouchbase();
+
+ //this tunes the SDK (to customize connection timeout)
+ CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s
+ .build();
+ Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost());
+ cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+ bucket = cluster.openBucket(couchbase.getDatabase());
+ // Create a N1QL Primary Index (but ignore if it exists)
+ bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+
+ log.info("Connected to Couchbase {}", couchbase.getHost());
+ isReady = true;
+ } catch (Exception ex) {
+ log.error("error connection to Couchbase.", ex);
+ isReady = false;
+ }
}
@PreDestroy
- public void cleanUp() {
+ public void cleanUp() {
bucket.close();
- }
+ }
- public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- List<JsonDocument> documents= new ArrayList<>(jsons.size());
- for(JSONObject json : jsons) {
+ public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ List<JsonDocument> documents = new ArrayList<>(jsons.size());
+ for (JSONObject json : jsons) {
//convert to Couchbase JsonObject from org.json JSONObject
- JsonObject jsonObject = JsonObject.fromJson(json.toString());
+ JsonObject jsonObject = JsonObject.fromJson(json.toString());
long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
//setup TTL
- int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
-
+ int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second
+
String id = getId(topic, json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
try {
saveDocuments(documents);
- }catch(Exception e) {
+ } catch (Exception e) {
log.error("error saving to Couchbase.", e);
}
- log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
+ log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size());
}
public String getId(TopicConfig topic, JSONObject json) {
//if this topic requires extract id from JSON
String id = topic.getMessageId(json);
- if(id != null) {
+ if (id != null) {
return id;
}
-
- String topicStr= topic.getName();
+
+ String topicStr = topic.getName();
//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -129,24 +134,19 @@ public class CouchbaseService {
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- id = topicStr +":"+ nextIdNumber.content();
-
+ id = topicStr + ":" + nextIdNumber.content();
+
return id;
}
-
+
//https://docs.couchbase.com/java-sdk/2.7/document-operations.html
- private void saveDocuments(List<JsonDocument> documents) {
- Observable
- .from(documents)
- .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
- @Override
- public Observable<JsonDocument> call(final JsonDocument docToInsert) {
- return bucket.async().insert(docToInsert);
- }
- })
- .last()
- .toBlocking()
- .single();
+ private void saveDocuments(List<JsonDocument> documents) {
+ Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
+ @Override
+ public Observable<JsonDocument> call(final JsonDocument docToInsert) {
+ return bucket.async().insert(docToInsert);
+ }
+ }).last().toBlocking().single();
}
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
index e859270f..58bb433a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
@@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service for Dbs
+ * Service for Dbs
*
* @author Guobiao Mo
*
@@ -38,11 +38,11 @@ public class DbService {
@Autowired
private DbRepository dbRepository;
-
+
public Db getDb(String name) {
Optional<Db> ret = dbRepository.findById(name);
return ret.isPresent() ? ret.get() : null;
- }
+ }
public Db getCouchbase() {
return getDb("Couchbase");
@@ -58,6 +58,10 @@ public class DbService {
public Db getDruid() {
return getDb("Druid");
- }
+ }
+
+ public Db getHdfs() {
+ return getDb("HDFS");
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index de8c9e89..0caec79a 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -65,9 +66,7 @@ public class DmaapService {
ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
List<String> topics = zk.getChildren("/brokers/topics", false);
String[] excludes = config.getDmaapKafkaExclude();
- for (String exclude : excludes) {
- topics.remove(exclude);
- }
+ topics.removeAll(Arrays.asList(excludes));
return topics;
} catch (Exception e) {
log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
@@ -81,7 +80,7 @@ public class DmaapService {
return Collections.emptyList();
}
- List<String> ret = new ArrayList<>();
+ List<String> ret = new ArrayList<>(allTopics.size());
for (String topicStr : allTopics) {
TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
if (topicConfig.isEnabled()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index c354f175..f1bed604 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -84,7 +84,7 @@ public class ElasticsearchService {
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
- log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
+ log.info("Connected to Elasticsearch Host {}", elasticsearchHost);
listener = new ActionListener<BulkResponse>() {
@Override
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
new file mode 100644
index 00000000..e8d29106
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java
@@ -0,0 +1,189 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.dto.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Service to write data to HDFS
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class HdfsService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
+
+ FileSystem fileSystem;
+ private boolean isReady = false;
+
+ private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
+ private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
+ private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
+
+ @Setter
+ @Getter
+ private class Buffer {
+ long lastFlush;
+ List<String> data;
+
+ public Buffer() {
+ lastFlush = Long.MIN_VALUE;
+ data = new ArrayList<>();
+ }
+
+ public void flush(String topic) {
+ try {
+ saveMessages(topic, data);
+ data.clear();
+ lastFlush = System.currentTimeMillis();
+ log.debug("done flush, topic={}, buffer size={}", topic, data.size());
+ } catch (IOException e) {
+ log.error("error saving to HDFS." + topic, e);
+ }
+ }
+
+ public void flushStall(String topic) {
+ if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) {
+ log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
+ flush(topic);
+ }
+ }
+
+ private void saveMessages(String topic, List<String> bufferList) throws IOException {
+
+ String thread = Thread.currentThread().getName();
+ Date date = new Date();
+ String day = dayFormat.get().format(date);
+ String time = timeFormat.get().format(date);
+ String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread);
+ Path path = new Path(filePath);
+ log.debug("writing to HDFS {}", filePath);
+
+ // Create a new file and write data to it.
+ FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
+
+ bufferList.stream().forEach(message -> {
+ try {
+ out.writeUTF(message);
+ out.write('\n');
+ } catch (IOException e) {
+ log.error("error writing to HDFS.", e);
+ }
+ });
+
+ out.close();
+ }
+ }
+
+ @PostConstruct
+ private void init() {
+ // Initialize HDFS Connection
+ try {
+ Db hdfs = dbService.getHdfs();
+
+ //Get configuration of Hadoop system
+ Configuration hdfsConfig = new Configuration();
+
+ int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
+
+ String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
+ hdfsConfig.set("fs.defaultFS", hdfsuri);
+
+ log.info("Connecting to -- {}", hdfsuri);
+
+ fileSystem = FileSystem.get(hdfsConfig);
+
+ isReady = true;
+ } catch (Exception ex) {
+ log.error("error connection to HDFS.", ex);
+ isReady = false;
+ }
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ try {
+ flush();
+ fileSystem.close();
+ } catch (IOException e) {
+ log.error("fileSystem.close() at cleanUp.", e);
+ }
+ }
+
+ public void flush() {
+ bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
+ }
+
+ //if no new data comes in for a topic for a while, need to flush its buffer
+ public void flushStall() {
+ bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
+ }
+
+ public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) {
+ String topicStr = topic.getName();
+
+ Map<String, Buffer> bufferMap = bufferLocal.get();
+ final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
+
+ List<String> bufferData = buffer.getData();
+
+ messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used
+
+ if (bufferData.size() >= config.getHdfsBatchSize()) {
+ buffer.flush(topicStr);
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
index c5408951..32d21c62 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
@@ -135,7 +135,7 @@ public class MongodbService {
}
public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
- if (dbReady == false)
+ if (dbReady == false)//TOD throw exception
return;
List<Document> documents = new ArrayList<>(jsons.size());
for (JSONObject json : jsons) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index b3a6d29a..1154b3a9 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -54,7 +54,7 @@ import org.springframework.stereotype.Service;
*/
@Service
-@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PullThread implements Runnable {
@Autowired
@@ -90,7 +90,8 @@ public class PullThread implements Runnable {
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
- consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -119,28 +120,29 @@ public class PullThread implements Runnable {
while (active.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
- if (records != null) {
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
- }
+ if (records != null) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
+ storeService.flushStall();
}
} catch (Exception e) {
log.error("Puller {} run(): exception={}", id, e.getMessage());
@@ -153,6 +155,7 @@ public class PullThread implements Runnable {
public void shutdown() {
active.set(false);
consumer.wakeup();
+ consumer.unsubscribe();
}
private class DummyRebalanceListener implements ConsumerRebalanceListener {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 449dacfc..2d00a9b8 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import org.apache.commons.lang3.tuple.Pair;
@@ -35,7 +34,6 @@ import org.json.JSONException;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import org.slf4j.Logger;
@@ -77,6 +75,9 @@ public class StoreService {
@Autowired
private ElasticsearchService elasticsearchService;
+ @Autowired
+ private HdfsService hdfsService;
+
private Map<String, TopicConfig> topicMap = new HashMap<>();
private ObjectMapper yamlReader;
@@ -86,10 +87,6 @@ public class StoreService {
yamlReader = new ObjectMapper(new YAMLFactory());
}
- @PreDestroy
- public void cleanUp() {
- }
-
public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
if (messages == null || messages.isEmpty()) {
return;
@@ -109,7 +106,7 @@ public class StoreService {
}
}
- saveJsons(topic, docs);
+ saveJsons(topic, docs, messages);
}
private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
@@ -161,7 +158,7 @@ public class StoreService {
return json;
}
- private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
+ private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
if (topic.supportMongoDB()) {
mongodbService.saveJsons(topic, jsons);
}
@@ -173,6 +170,13 @@ public class StoreService {
if (topic.supportElasticsearch()) {
elasticsearchService.saveJsons(topic, jsons);
}
+
+ if (topic.supportHdfs()) {
+ hdfsService.saveMessages(topic, messages);
+ }
}
+ public void flushStall() {
+ hdfsService.flushStall();
+ }
}