summaryrefslogtreecommitdiffstats
path: root/robotframework-onap/ONAPLibrary/KafkaKeywords.py
diff options
context:
space:
mode:
authorDR695H <dr695h@att.com>2019-06-18 17:11:30 -0400
committerDR695H <dr695h@att.com>2019-06-18 17:13:02 -0400
commitb8a725cbb580dc1e1df89a9c36cb27f1d8ce9d6c (patch)
treeba4ea14fec54a76ed0a805b56a0996bc81448cab /robotframework-onap/ONAPLibrary/KafkaKeywords.py
parent7aeb828932bbf73e4ffb6a35263e3fe2e50bfb80 (diff)
add sasl support to kafka
Change-Id: I372bcc8a478d137f58b6ab9017a555f584e48f30 Issue-ID: TEST-158 Signed-off-by: DR695H <dr695h@att.com>
Diffstat (limited to 'robotframework-onap/ONAPLibrary/KafkaKeywords.py')
-rw-r--r--robotframework-onap/ONAPLibrary/KafkaKeywords.py58
1 files changed, 41 insertions, 17 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index 6da5b52..41ef976 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -11,8 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from pykafka import KafkaClient
-from pykafka.common import OffsetType
+
+from kafka import KafkaConsumer
+from kafka import KafkaProducer
+import ssl
from robot.api.deco import keyword
from robot import utils
@@ -25,9 +27,16 @@ class KafkaKeywords(object):
self._cache = utils.ConnectionCache('No Kafka Environments created')
@keyword
- def connect(self, alias, kafka_host, kafka_version="1.0.0"):
+ def connect(self, alias, kafka_host, sasl_user, sasl_password):
"""connect to the specified kafka server"""
- client = KafkaClient(hosts=kafka_host, broker_version=kafka_version)
+ client = {
+ "bootstrap_servers": kafka_host,
+ "sasl_plain_username": sasl_user,
+ "sasl_plain_password": sasl_password,
+ "security_protocol": 'SASL_SSL',
+ "ssl_context": ssl.create_default_context(),
+ "sasl_mechanism": 'PLAIN'
+ }
self._cache.register(client, alias=alias)
@keyword
@@ -35,12 +44,18 @@ class KafkaKeywords(object):
assert topic
assert value
- producer = self._get_producer(alias, topic)
- return producer.produce(value, key)
+ producer = self._get_producer(alias)
+ return producer.send(topic, value=value, key=key)
- def _get_producer(self, alias, topic_name):
- topic = self._cache.switch(alias).topics[topic_name]
- prod = topic.get_sync_producer()
+ def _get_producer(self, alias):
+ cache = self._cache.switch(alias)
+ prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
+ sasl_plain_username=cache['sasl_plain_username'],
+ sasl_plain_password=cache['sasl_password'],
+ security_protocol=cache['security_protocol'],
+ ssl_context=cache['ssl_context'],
+ sasl_mechanism=cache['sasl_mechanism'],
+ request_timeout_ms=5000)
return prod
@keyword
@@ -48,7 +63,8 @@ class KafkaKeywords(object):
assert topic_name
consumer = self._get_consumer(alias, topic_name, consumer_group)
- msg = consumer.consume()
+ msg = next(consumer)
+ consumer.close(autocommit=True)
if msg is None:
return None
else:
@@ -63,14 +79,22 @@ class KafkaKeywords(object):
else:
cgn = topic_name
- topic = self._cache.switch(alias).topics[topic_name]
+ cache = self._cache.switch(alias)
+
+ consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
+ sasl_plain_username=cache['sasl_plain_username'],
+ sasl_plain_password=cache['sasl_password'],
+ security_protocol=cache['security_protocol'],
+ ssl_context=cache['ssl_context'],
+ sasl_mechanism=cache['sasl_mechanism'],
+ group_id=cgn,
+ request_timeout_ms=5000)
- offset_type = OffsetType.LATEST
if set_offset_to_earliest:
- offset_type = OffsetType.EARLIEST
+ consumer.seek_to_beginning()
+ else:
+ consumer.seek_to_end()
- c = topic.get_simple_consumer(
- consumer_group=cgn, auto_offset_reset=offset_type, auto_commit_enable=True,
- reset_offset_on_start=True, consumer_timeout_ms=5000)
+ consumer.topics()
- return c
+ return consumer