diff options
author | 2019-04-27 03:03:16 +0000 | |
---|---|---|
committer | 2019-04-27 03:03:16 +0000 | |
commit | fa92c7eabf5d05d8703e55be71332786e7fbfffc (patch) | |
tree | 01fdbf036c01b247612f36569bc8d3e2a6e37b85 /components/datalake-handler/feeder/src/main | |
parent | 292c6a896f98b010029a83c332ec0672af29b939 (diff) | |
parent | 03169f74b3bb12cdc2648aba07153b078622a558 (diff) |
Merge "Unit test code for datalake seed code"
Diffstat (limited to 'components/datalake-handler/feeder/src/main')
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java | 51 |
1 files changed, 26 insertions, 25 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index e9f36b2d..ce671a90 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -48,7 +48,7 @@ import org.springframework.stereotype.Service; /** * Thread that pulls messages from DMaaP and save them to Big Data DBs - * + * * @author Guobiao Mo * */ @@ -65,11 +65,11 @@ public class PullThread implements Runnable { @Autowired private ApplicationConfiguration config; - + private final Logger log = LoggerFactory.getLogger(this.getClass()); private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text - private int id; + private int id; private final AtomicBoolean active = new AtomicBoolean(false); private boolean async; @@ -112,33 +112,34 @@ public class PullThread implements Runnable { List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop log.info("Thread {} going to subscribe to topics: {}", id, topics); - + consumer.subscribe(topics, rebalanceListener); while (active.get()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); - - List<Pair<Long, String>> messages = new ArrayList<>(records.count()); - for (TopicPartition partition : records.partitions()) { - messages.clear(); - List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); - for (ConsumerRecord<String, String> record : partitionRecords) { - messages.add(Pair.of(record.timestamp(), record.value())); - //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); - } - storeService.saveMessages(partition.topic(), messages); - log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB - - if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit - long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); - } - } - - if (async) {//for high Throughput, async commit offset in batch to Kafka - consumer.commitAsync(); - } + if (records != null) { + List<Pair<Long, String>> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } } } catch (Exception e) { log.error("Puller {} run(): exception={}", id, e.getMessage()); |