diff options
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
10 files changed, 309 insertions, 99 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index d59c0fc1..9106185e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -41,6 +41,16 @@ import lombok.Setter; @EnableAutoConfiguration public class ApplicationConfiguration { + //App general + private boolean async; + private boolean enableSSL; + + private String timestampLabel; + private String rawDataLabel; + + private String defaultTopicName; + + //DMaaP private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; @@ -51,13 +61,10 @@ public class ApplicationConfiguration { private int kafkaConsumerCount; - private boolean async; - private boolean enableSSL; - - private String timestampLabel; - private String rawDataLabel; - - private String defaultTopicName; - private String elasticsearchType; + + //HDFS + private int hdfsBufferSize; + private long hdfsFlushInterval; + private int hdfsBatchSize; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 15ffc8a3..deaa0969 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -60,7 +60,11 @@ public class TopicConfig { } } - + + public boolean supportHdfs() { + return containDb("HDFS"); + } + public boolean supportElasticsearch() { return containDb("Elasticsearch");//TODO string hard codes } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index f5ee5b79..7c237766 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -19,17 +19,16 @@ */ package org.onap.datalake.feeder.service; - + import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; - + import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,9 @@ import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.JsonLongDocument; -import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.document.json.JsonObject; +import com.couchbase.client.java.env.CouchbaseEnvironment; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import rx.Observable; import rx.functions.Func1; @@ -63,65 +64,69 @@ public class CouchbaseService { @Autowired private DbService dbService; - + Bucket bucket; private boolean isReady = false; @PostConstruct private void init() { - // Initialize Couchbase Connection - try { - Db couchbase = dbService.getCouchbase(); - Cluster cluster = CouchbaseCluster.create(couchbase.getHost()); - cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); - bucket = cluster.openBucket(couchbase.getDatabase()); - log.info("Connect to Couchbase {}", couchbase.getHost()); - // Create a N1QL Primary Index (but ignore if it exists) - bucket.bucketManager().createN1qlPrimaryIndex(true, false); - isReady = true; - } - catch(Exception ex) - { - isReady = false; - } + // Initialize Couchbase Connection + try { + Db couchbase = dbService.getCouchbase(); + + //this tunes the SDK (to customize connection timeout) + CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder().connectTimeout(60000) // 60s, default is 5s + .build(); + Cluster cluster = CouchbaseCluster.create(env, couchbase.getHost()); + cluster.authenticate(couchbase.getLogin(), couchbase.getPass()); + bucket = cluster.openBucket(couchbase.getDatabase()); + // Create a N1QL Primary Index (but ignore if it exists) + bucket.bucketManager().createN1qlPrimaryIndex(true, false); + + log.info("Connected to Couchbase {}", couchbase.getHost()); + isReady = true; + } catch (Exception ex) { + log.error("error connection to Couchbase.", ex); + isReady = false; + } } @PreDestroy - public void cleanUp() { + public void cleanUp() { bucket.close(); - } + } - public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - List<JsonDocument> documents= new ArrayList<>(jsons.size()); - for(JSONObject json : jsons) { + public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + List<JsonDocument> documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { //convert to Couchbase JsonObject from org.json JSONObject - JsonObject jsonObject = JsonObject.fromJson(json.toString()); + JsonObject jsonObject = JsonObject.fromJson(json.toString()); long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson() //setup TTL - int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second - + int expiry = (int) (timestamp / 1000L) + topic.getTtl() * 3600 * 24; //in second + String id = getId(topic, json); JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); documents.add(doc); } try { saveDocuments(documents); - }catch(Exception e) { + } catch (Exception e) { log.error("error saving to Couchbase.", e); } - log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); + log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } public String getId(TopicConfig topic, JSONObject json) { //if this topic requires extract id from JSON String id = topic.getMessageId(json); - if(id != null) { + if (id != null) { return id; } - - String topicStr= topic.getName(); + + String topicStr = topic.getName(); //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 @@ -129,24 +134,19 @@ public class CouchbaseService { // increment by 1, initialize at 0 if counter doc not found //TODO how slow is this compared with above UUID approach? JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 - id = topicStr +":"+ nextIdNumber.content(); - + id = topicStr + ":" + nextIdNumber.content(); + return id; } - + //https://docs.couchbase.com/java-sdk/2.7/document-operations.html - private void saveDocuments(List<JsonDocument> documents) { - Observable - .from(documents) - .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { - @Override - public Observable<JsonDocument> call(final JsonDocument docToInsert) { - return bucket.async().insert(docToInsert); - } - }) - .last() - .toBlocking() - .single(); + private void saveDocuments(List<JsonDocument> documents) { + Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { + @Override + public Observable<JsonDocument> call(final JsonDocument docToInsert) { + return bucket.async().insert(docToInsert); + } + }).last().toBlocking().single(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java index e859270f..58bb433a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service for Dbs + * Service for Dbs * * @author Guobiao Mo * @@ -38,11 +38,11 @@ public class DbService { @Autowired private DbRepository dbRepository; - + public Db getDb(String name) { Optional<Db> ret = dbRepository.findById(name); return ret.isPresent() ? ret.get() : null; - } + } public Db getCouchbase() { return getDb("Couchbase"); @@ -58,6 +58,10 @@ public class DbService { public Db getDruid() { return getDb("Druid"); - } + } + + public Db getHdfs() { + return getDb("HDFS"); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index de8c9e89..0caec79a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -22,6 +22,7 @@ package org.onap.datalake.feeder.service; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -65,9 +66,7 @@ public class DmaapService { ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); List<String> topics = zk.getChildren("/brokers/topics", false); String[] excludes = config.getDmaapKafkaExclude(); - for (String exclude : excludes) { - topics.remove(exclude); - } + topics.removeAll(Arrays.asList(excludes)); return topics; } catch (Exception e) { log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); @@ -81,7 +80,7 @@ public class DmaapService { return Collections.emptyList(); } - List<String> ret = new ArrayList<>(); + List<String> ret = new ArrayList<>(allTopics.size()); for (String topicStr : allTopics) { TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); if (topicConfig.isEnabled()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index c354f175..f1bed604 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -84,7 +84,7 @@ public class ElasticsearchService { // Initialize the Connection client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); - log.info("Connect to Elasticsearch Host {}", elasticsearchHost); + log.info("Connected to Elasticsearch Host {}", elasticsearchHost); listener = new ActionListener<BulkResponse>() { @Override diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java new file mode 100644 index 00000000..e8d29106 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/HdfsService.java @@ -0,0 +1,189 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.dto.TopicConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.Setter; + +/** + * Service to write data to HDFS + * + * @author Guobiao Mo + * + */ +@Service +public class HdfsService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + ApplicationConfiguration config; + + @Autowired + private DbService dbService; + + FileSystem fileSystem; + private boolean isReady = false; + + private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new); + private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); + private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS")); + + @Setter + @Getter + private class Buffer { + long lastFlush; + List<String> data; + + public Buffer() { + lastFlush = Long.MIN_VALUE; + data = new ArrayList<>(); + } + + public void flush(String topic) { + try { + saveMessages(topic, data); + data.clear(); + lastFlush = System.currentTimeMillis(); + log.debug("done flush, topic={}, buffer size={}", topic, data.size()); + } catch (IOException e) { + log.error("error saving to HDFS." + topic, e); + } + } + + public void flushStall(String topic) { + if (!data.isEmpty() && System.currentTimeMillis() > lastFlush + config.getHdfsFlushInterval()) { + log.debug("going to flushStall topic={}, buffer size={}", topic, data.size()); + flush(topic); + } + } + + private void saveMessages(String topic, List<String> bufferList) throws IOException { + + String thread = Thread.currentThread().getName(); + Date date = new Date(); + String day = dayFormat.get().format(date); + String time = timeFormat.get().format(date); + String filePath = String.format("/datalake/%s/%s/%s-%s", topic, day, time, thread); + Path path = new Path(filePath); + log.debug("writing to HDFS {}", filePath); + + // Create a new file and write data to it. + FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize()); + + bufferList.stream().forEach(message -> { + try { + out.writeUTF(message); + out.write('\n'); + } catch (IOException e) { + log.error("error writing to HDFS.", e); + } + }); + + out.close(); + } + } + + @PostConstruct + private void init() { + // Initialize HDFS Connection + try { + Db hdfs = dbService.getHdfs(); + + //Get configuration of Hadoop system + Configuration hdfsConfig = new Configuration(); + + int port = hdfs.getPort() == null ? 8020 : hdfs.getPort(); + + String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port); + hdfsConfig.set("fs.defaultFS", hdfsuri); + + log.info("Connecting to -- {}", hdfsuri); + + fileSystem = FileSystem.get(hdfsConfig); + + isReady = true; + } catch (Exception ex) { + log.error("error connection to HDFS.", ex); + isReady = false; + } + } + + @PreDestroy + public void cleanUp() { + try { + flush(); + fileSystem.close(); + } catch (IOException e) { + log.error("fileSystem.close() at cleanUp.", e); + } + } + + public void flush() { + bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic)); + } + + //if no new data comes in for a topic for a while, need to flush its buffer + public void flushStall() { + bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic)); + } + + public void saveMessages(TopicConfig topic, List<Pair<Long, String>> messages) { + String topicStr = topic.getName(); + + Map<String, Buffer> bufferMap = bufferLocal.get(); + final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer()); + + List<String> bufferData = buffer.getData(); + + messages.stream().forEach(message -> bufferData.add(message.getRight()));//note that message left is not used + + if (bufferData.size() >= config.getHdfsBatchSize()) { + buffer.flush(topicStr); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index c5408951..32d21c62 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -135,7 +135,7 @@ public class MongodbService { } public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { - if (dbReady == false) + if (dbReady == false)//TOD throw exception return; List<Document> documents = new ArrayList<>(jsons.size()); for (JSONObject json : jsons) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index b3a6d29a..1154b3a9 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -54,7 +54,7 @@ import org.springframework.stereotype.Service; */ @Service -@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE) +@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class PullThread implements Runnable { @Autowired @@ -90,7 +90,8 @@ public class PullThread implements Runnable { Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, String.valueOf(id)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); @@ -119,28 +120,29 @@ public class PullThread implements Runnable { while (active.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); - if (records != null) { - List<Pair<Long, String>> messages = new ArrayList<>(records.count()); - for (TopicPartition partition : records.partitions()) { - messages.clear(); - List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); - for (ConsumerRecord<String, String> record : partitionRecords) { - messages.add(Pair.of(record.timestamp(), record.value())); - //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); - } - storeService.saveMessages(partition.topic(), messages); - log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit - long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); - } - } - - if (async) {//for high Throughput, async commit offset in batch to Kafka - consumer.commitAsync(); - } - } + if (records != null) { + List<Pair<Long, String>> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("saved to topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } + storeService.flushStall(); } } catch (Exception e) { log.error("Puller {} run(): exception={}", id, e.getMessage()); @@ -153,6 +155,7 @@ public class PullThread implements Runnable { public void shutdown() { active.set(false); consumer.wakeup(); + consumer.unsubscribe(); } private class DummyRebalanceListener implements ConsumerRebalanceListener { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 449dacfc..2d00a9b8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import org.apache.commons.lang3.tuple.Pair; @@ -35,7 +34,6 @@ import org.json.JSONException; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Topic; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; import org.slf4j.Logger; @@ -77,6 +75,9 @@ public class StoreService { @Autowired private ElasticsearchService elasticsearchService; + @Autowired + private HdfsService hdfsService; + private Map<String, TopicConfig> topicMap = new HashMap<>(); private ObjectMapper yamlReader; @@ -86,10 +87,6 @@ public class StoreService { yamlReader = new ObjectMapper(new YAMLFactory()); } - @PreDestroy - public void cleanUp() { - } - public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text if (messages == null || messages.isEmpty()) { return; @@ -109,7 +106,7 @@ public class StoreService { } } - saveJsons(topic, docs); + saveJsons(topic, docs, messages); } private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException { @@ -161,7 +158,7 @@ public class StoreService { return json; } - private void saveJsons(TopicConfig topic, List<JSONObject> jsons) { + private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) { if (topic.supportMongoDB()) { mongodbService.saveJsons(topic, jsons); } @@ -173,6 +170,13 @@ public class StoreService { if (topic.supportElasticsearch()) { elasticsearchService.saveJsons(topic, jsons); } + + if (topic.supportHdfs()) { + hdfsService.saveMessages(topic, messages); + } } + public void flushStall() { + hdfsService.flushStall(); + } } |