summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBogumil Zebek <bogumil.zebek@nokia.com>2020-04-21 14:26:56 +0200
committerZebek Bogumil <bogumil.zebek@nokia.com>2020-04-21 14:26:56 +0200
commit37622e3c6acaf455a9a4a5874f190e8ff17a693e (patch)
treedbe0599a61cf57422bb27c3978ca88256f69fd06
parentb09aee8b731c27e2ae3106ab3508c533cac02cda (diff)
Remove deprecated method
Once deprecated, classes, and interfaces, and their members should be avoided, rather than used, inherited or extended. Deprecation is a warning that the class or interface has been superseded, and will eventually be removed. The deprecation period allows you to make a smooth transition away from the aging, soon-to-be-retired technology. Issue-ID: INT-1517 Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com> Change-Id: I985959d614c6f2544b0515ed52d6343f194e4720
-rw-r--r--netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java11
1 files changed, 8 insertions, 3 deletions
diff --git a/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java b/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
index 1b99220..6bd8390 100644
--- a/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
+++ b/netconfsimulator/src/main/java/org/onap/netconfsimulator/kafka/StoreService.java
@@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,7 +54,7 @@ public class StoreService {
String clientId = Long.toString(Instant.now().getEpochSecond());
try (Consumer<String, String> consumer = consumerFactory.createConsumer(clientId, clientId)) {
consumer.subscribe(TOPICS_TO_SUBSCRIBE);
- ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS);
+ ConsumerRecords<String, String> consumerRecords = pollConsumerRecords(consumer);
consumerRecords.forEach(
consumerRecord ->
messages.add(new Message(consumerRecord.timestamp(), consumerRecord.value())));
@@ -65,7 +66,7 @@ public class StoreService {
List<Message> getLastMessages(long offset) {
List<Message> messages = new ArrayList<>();
try (Consumer<String, String> consumer = createConsumer(offset)) {
- ConsumerRecords<String, String> consumerRecords = consumer.poll(CONSUMING_DURATION_IN_MS);
+ ConsumerRecords<String, String> consumerRecords = pollConsumerRecords(consumer);
consumerRecords.forEach(consumerRecord ->
messages.add(new Message(consumerRecord.timestamp(), consumerRecord.value())));
}
@@ -82,10 +83,14 @@ public class StoreService {
private void seekConsumerTo(Consumer<String, String> consumer, long offsetFromLastIndex) {
consumer.seekToEnd(consumer.assignment());
- consumer.poll(CONSUMING_DURATION_IN_MS);
+ pollConsumerRecords(consumer);
TopicPartition topicPartition = consumer.assignment().iterator().next();
long topicCurrentSize = consumer.position(topicPartition);
long indexToSeek = offsetFromLastIndex > topicCurrentSize ? 0 : topicCurrentSize - offsetFromLastIndex;
consumer.seek(topicPartition, indexToSeek);
}
+
+ private ConsumerRecords<String, String> pollConsumerRecords(Consumer<String, String> consumer) {
+ return consumer.poll(Duration.ofMillis(CONSUMING_DURATION_IN_MS));
+ }
}