diff options
author | marekpl <marek.pondel@nokia.com> | 2019-07-29 13:29:49 +0200 |
---|---|---|
committer | marekpl <marek.pondel@nokia.com> | 2019-07-29 13:29:49 +0200 |
commit | 7ad3fbdb3ae959261f48073ce24de289516d5b26 (patch) | |
tree | 15cf1f43f5910e66ab1af4b0a6b1bc07ce1e8e34 | |
parent | 8fc1a7e09a6be2b463700f3523ef9b5f8089d945 (diff) |
KafkaKeywords update
KafkaKeywords update
Issue-ID: DCAEGEN2-565
Signed-off-by: marekpl <marek.pondel@nokia.com>
Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac
-rw-r--r-- | robotframework-onap/ONAPLibrary/KafkaKeywords.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index c178bc8..f5adce5 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -71,7 +71,7 @@ class KafkaKeywords(object): else: return msg.value - def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True): + def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False): assert topic_name # default to the topic as group name @@ -91,14 +91,15 @@ class KafkaKeywords(object): group_id=cgn, request_timeout_ms=10001) - consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)]) - consumer.poll() + partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)] + consumer.assign(partitions) + last = consumer.end_offsets(partitions) + offset = max(last.values()) if set_offset_to_earliest: consumer.seek_to_beginning() else: - consumer.seek_to_end() - - consumer.topics() + for tp in partitions: + consumer.seek(tp, offset - 1) return consumer |