summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormarekpl <marek.pondel@nokia.com>2019-07-29 13:29:49 +0200
committermarekpl <marek.pondel@nokia.com>2019-07-29 13:29:49 +0200
commit7ad3fbdb3ae959261f48073ce24de289516d5b26 (patch)
tree15cf1f43f5910e66ab1af4b0a6b1bc07ce1e8e34
parent8fc1a7e09a6be2b463700f3523ef9b5f8089d945 (diff)
KafkaKeywords update
KafkaKeywords update Issue-ID: DCAEGEN2-565 Signed-off-by: marekpl <marek.pondel@nokia.com> Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac
-rw-r--r--robotframework-onap/ONAPLibrary/KafkaKeywords.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index c178bc8..f5adce5 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -71,7 +71,7 @@ class KafkaKeywords(object):
else:
return msg.value
- def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
+ def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
assert topic_name
# default to the topic as group name
@@ -91,14 +91,15 @@ class KafkaKeywords(object):
group_id=cgn,
request_timeout_ms=10001)
- consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
- consumer.poll()
+ partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
+ consumer.assign(partitions)
+ last = consumer.end_offsets(partitions)
+ offset = max(last.values())
if set_offset_to_earliest:
consumer.seek_to_beginning()
else:
- consumer.seek_to_end()
-
- consumer.topics()
+ for tp in partitions:
+ consumer.seek(tp, offset - 1)
return consumer