From 7ad3fbdb3ae959261f48073ce24de289516d5b26 Mon Sep 17 00:00:00 2001 From: marekpl Date: Mon, 29 Jul 2019 13:29:49 +0200 Subject: KafkaKeywords update KafkaKeywords update Issue-ID: DCAEGEN2-565 Signed-off-by: marekpl Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac --- robotframework-onap/ONAPLibrary/KafkaKeywords.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index c178bc8..f5adce5 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -71,7 +71,7 @@ class KafkaKeywords(object): else: return msg.value - def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True): + def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False): assert topic_name # default to the topic as group name @@ -91,14 +91,15 @@ class KafkaKeywords(object): group_id=cgn, request_timeout_ms=10001) - consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)]) - consumer.poll() + partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)] + consumer.assign(partitions) + last = consumer.end_offsets(partitions) + offset = max(last.values()) if set_offset_to_earliest: consumer.seek_to_beginning() else: - consumer.seek_to_end() - - consumer.topics() + for tp in partitions: + consumer.seek(tp, offset - 1) return consumer -- cgit 1.2.3-korg