summaryrefslogtreecommitdiffstats
path: root/robotframework-onap/ONAPLibrary/KafkaKeywords.py
diff options
context:
space:
mode:
authormarekpl <marek.pondel@nokia.com>2019-07-22 17:53:27 +0200
committermarekpl <marek.pondel@nokia.com>2019-07-22 17:55:26 +0200
commit7d87825261adb3582282b510408136e4faca8459 (patch)
treea46dc28bfa679d69832f99aafcb8572d050772a2 /robotframework-onap/ONAPLibrary/KafkaKeywords.py
parent4c8dc6a28644cbd44084296907d54c66b447432e (diff)
hvves related utils update
hvves related utils update Issue-ID: DCAEGEN2-565 Signed-off-by: marekpl <marek.pondel@nokia.com> Change-Id: I97d10ef25671ceb6ea6a5987fe0e4f9f19504d39
Diffstat (limited to 'robotframework-onap/ONAPLibrary/KafkaKeywords.py')
-rw-r--r--robotframework-onap/ONAPLibrary/KafkaKeywords.py13
1 files changed, 8 insertions, 5 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index 41ef976..0e15cfc 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -33,7 +33,7 @@ class KafkaKeywords(object):
"bootstrap_servers": kafka_host,
"sasl_plain_username": sasl_user,
"sasl_plain_password": sasl_password,
- "security_protocol": 'SASL_SSL',
+ "security_protocol": 'SASL_PLAINTEXT',
"ssl_context": ssl.create_default_context(),
"sasl_mechanism": 'PLAIN'
}
@@ -51,7 +51,7 @@ class KafkaKeywords(object):
cache = self._cache.switch(alias)
prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
sasl_plain_username=cache['sasl_plain_username'],
- sasl_plain_password=cache['sasl_password'],
+ sasl_plain_password=cache['sasl_plain_password'],
security_protocol=cache['security_protocol'],
ssl_context=cache['ssl_context'],
sasl_mechanism=cache['sasl_mechanism'],
@@ -70,7 +70,7 @@ class KafkaKeywords(object):
else:
return msg.value
- def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
+ def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
assert topic_name
# default to the topic as group name
@@ -83,12 +83,15 @@ class KafkaKeywords(object):
consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
sasl_plain_username=cache['sasl_plain_username'],
- sasl_plain_password=cache['sasl_password'],
+ sasl_plain_password=cache['sasl_plain_password'],
security_protocol=cache['security_protocol'],
ssl_context=cache['ssl_context'],
sasl_mechanism=cache['sasl_mechanism'],
group_id=cgn,
- request_timeout_ms=5000)
+ request_timeout_ms=10001)
+
+ consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
+ consumer.poll()
if set_offset_to_earliest:
consumer.seek_to_beginning()