summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorYan Yang <yangyanyj@chinamobile.com>2019-04-27 03:03:16 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-27 03:03:16 +0000
commitfa92c7eabf5d05d8703e55be71332786e7fbfffc (patch)
tree01fdbf036c01b247612f36569bc8d3e2a6e37b85 /components/datalake-handler/feeder/src/main/java/org
parent292c6a896f98b010029a83c332ec0672af29b939 (diff)
parent03169f74b3bb12cdc2648aba07153b078622a558 (diff)
Merge "Unit test code for datalake seed code"
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java51
1 files changed, 26 insertions, 25 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
index e9f36b2d..ce671a90 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -48,7 +48,7 @@ import org.springframework.stereotype.Service;
/**
* Thread that pulls messages from DMaaP and save them to Big Data DBs
- *
+ *
* @author Guobiao Mo
*
*/
@@ -65,11 +65,11 @@ public class PullThread implements Runnable {
@Autowired
private ApplicationConfiguration config;
-
+
private final Logger log = LoggerFactory.getLogger(this.getClass());
private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
- private int id;
+ private int id;
private final AtomicBoolean active = new AtomicBoolean(false);
private boolean async;
@@ -112,33 +112,34 @@ public class PullThread implements Runnable {
List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
log.info("Thread {} going to subscribe to topics: {}", id, topics);
-
+
consumer.subscribe(topics, rebalanceListener);
while (active.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
-
- List<Pair<Long, String>> messages = new ArrayList<>(records.count());
- for (TopicPartition partition : records.partitions()) {
- messages.clear();
- List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- for (ConsumerRecord<String, String> record : partitionRecords) {
- messages.add(Pair.of(record.timestamp(), record.value()));
- //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
- }
- storeService.saveMessages(partition.topic(), messages);
- log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
-
- if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
-
- if (async) {//for high Throughput, async commit offset in batch to Kafka
- consumer.commitAsync();
- }
+ if (records != null) {
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
}
} catch (Exception e) {
log.error("Puller {} run(): exception={}", id, e.getMessage());