diff options
author | marekpl <marek.pondel@nokia.com> | 2019-07-22 17:53:27 +0200 |
---|---|---|
committer | marekpl <marek.pondel@nokia.com> | 2019-07-22 17:55:26 +0200 |
commit | 7d87825261adb3582282b510408136e4faca8459 (patch) | |
tree | a46dc28bfa679d69832f99aafcb8572d050772a2 /robotframework-onap | |
parent | 4c8dc6a28644cbd44084296907d54c66b447432e (diff) |
hvves related utils update
hvves related utils update
Issue-ID: DCAEGEN2-565
Signed-off-by: marekpl <marek.pondel@nokia.com>
Change-Id: I97d10ef25671ceb6ea6a5987fe0e4f9f19504d39
Diffstat (limited to 'robotframework-onap')
-rw-r--r-- | robotframework-onap/ONAPLibrary/KafkaKeywords.py | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index 41ef976..0e15cfc 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -33,7 +33,7 @@ class KafkaKeywords(object): "bootstrap_servers": kafka_host, "sasl_plain_username": sasl_user, "sasl_plain_password": sasl_password, - "security_protocol": 'SASL_SSL', + "security_protocol": 'SASL_PLAINTEXT', "ssl_context": ssl.create_default_context(), "sasl_mechanism": 'PLAIN' } @@ -51,7 +51,7 @@ class KafkaKeywords(object): cache = self._cache.switch(alias) prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'], sasl_plain_username=cache['sasl_plain_username'], - sasl_plain_password=cache['sasl_password'], + sasl_plain_password=cache['sasl_plain_password'], security_protocol=cache['security_protocol'], ssl_context=cache['ssl_context'], sasl_mechanism=cache['sasl_mechanism'], @@ -70,7 +70,7 @@ class KafkaKeywords(object): else: return msg.value - def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False): + def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True): assert topic_name # default to the topic as group name @@ -83,12 +83,15 @@ class KafkaKeywords(object): consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'], sasl_plain_username=cache['sasl_plain_username'], - sasl_plain_password=cache['sasl_password'], + sasl_plain_password=cache['sasl_plain_password'], security_protocol=cache['security_protocol'], ssl_context=cache['ssl_context'], sasl_mechanism=cache['sasl_mechanism'], group_id=cgn, - request_timeout_ms=5000) + request_timeout_ms=10001) + + consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)]) + consumer.poll() if set_offset_to_earliest: consumer.seek_to_beginning() |