summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--robotframework-onap/ONAPLibrary/KafkaKeywords.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index c178bc8..f5adce5 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -71,7 +71,7 @@ class KafkaKeywords(object):
else:
return msg.value
- def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
+ def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
assert topic_name
# default to the topic as group name
@@ -91,14 +91,15 @@ class KafkaKeywords(object):
group_id=cgn,
request_timeout_ms=10001)
- consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
- consumer.poll()
+ partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
+ consumer.assign(partitions)
+ last = consumer.end_offsets(partitions)
+ offset = max(last.values())
if set_offset_to_earliest:
consumer.seek_to_beginning()
else:
- consumer.seek_to_end()
-
- consumer.topics()
+ for tp in partitions:
+ consumer.seek(tp, offset - 1)
return consumer