From b8a725cbb580dc1e1df89a9c36cb27f1d8ce9d6c Mon Sep 17 00:00:00 2001 From: DR695H Date: Tue, 18 Jun 2019 17:11:30 -0400 Subject: add sasl support to kafka Change-Id: I372bcc8a478d137f58b6ab9017a555f584e48f30 Issue-ID: TEST-158 Signed-off-by: DR695H --- robotframework-onap/ONAPLibrary/KafkaKeywords.py | 58 +++++++++++++++++------- robotframework-onap/setup.py | 2 +- 2 files changed, 42 insertions(+), 18 deletions(-) (limited to 'robotframework-onap') diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index 6da5b52..41ef976 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from pykafka import KafkaClient -from pykafka.common import OffsetType + +from kafka import KafkaConsumer +from kafka import KafkaProducer +import ssl from robot.api.deco import keyword from robot import utils @@ -25,9 +27,16 @@ class KafkaKeywords(object): self._cache = utils.ConnectionCache('No Kafka Environments created') @keyword - def connect(self, alias, kafka_host, kafka_version="1.0.0"): + def connect(self, alias, kafka_host, sasl_user, sasl_password): """connect to the specified kafka server""" - client = KafkaClient(hosts=kafka_host, broker_version=kafka_version) + client = { + "bootstrap_servers": kafka_host, + "sasl_plain_username": sasl_user, + "sasl_plain_password": sasl_password, + "security_protocol": 'SASL_SSL', + "ssl_context": ssl.create_default_context(), + "sasl_mechanism": 'PLAIN' + } self._cache.register(client, alias=alias) @keyword @@ -35,12 +44,18 @@ class KafkaKeywords(object): assert topic assert value - producer = self._get_producer(alias, topic) - return producer.produce(value, key) + producer = self._get_producer(alias) + return producer.send(topic, value=value, key=key) - def _get_producer(self, alias, topic_name): - topic = self._cache.switch(alias).topics[topic_name] - prod = topic.get_sync_producer() + def _get_producer(self, alias): + cache = self._cache.switch(alias) + prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + request_timeout_ms=5000) return prod @keyword @@ -48,7 +63,8 @@ class KafkaKeywords(object): assert topic_name consumer = self._get_consumer(alias, topic_name, consumer_group) - msg = consumer.consume() + msg = next(consumer) + consumer.close(autocommit=True) if msg is None: return None else: @@ -63,14 +79,22 @@ class KafkaKeywords(object): else: cgn = topic_name - topic = self._cache.switch(alias).topics[topic_name] + cache = self._cache.switch(alias) + + consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + group_id=cgn, + request_timeout_ms=5000) - offset_type = OffsetType.LATEST if set_offset_to_earliest: - offset_type = OffsetType.EARLIEST + consumer.seek_to_beginning() + else: + consumer.seek_to_end() - c = topic.get_simple_consumer( - consumer_group=cgn, auto_offset_reset=offset_type, auto_commit_enable=True, - reset_offset_on_start=True, consumer_timeout_ms=5000) + consumer.topics() - return c + return consumer diff --git a/robotframework-onap/setup.py b/robotframework-onap/setup.py index ddc8f79..76a0f2c 100644 --- a/robotframework-onap/setup.py +++ b/robotframework-onap/setup.py @@ -39,7 +39,7 @@ setup( 'requests', 'future', 'robotframework-requests', - 'pykafka', + 'kafka-python', 'urllib3' ], # what we need packages=['eteutils', 'loadtest', 'vcpeutils', 'ONAPLibrary'], # The name of your scripts package -- cgit 1.2.3-korg