diff options
author | DR695H <dr695h@att.com> | 2019-06-18 17:11:30 -0400 |
---|---|---|
committer | DR695H <dr695h@att.com> | 2019-06-18 17:13:02 -0400 |
commit | b8a725cbb580dc1e1df89a9c36cb27f1d8ce9d6c (patch) | |
tree | ba4ea14fec54a76ed0a805b56a0996bc81448cab /robotframework-onap/ONAPLibrary/KafkaKeywords.py | |
parent | 7aeb828932bbf73e4ffb6a35263e3fe2e50bfb80 (diff) |
add sasl support to kafka
Change-Id: I372bcc8a478d137f58b6ab9017a555f584e48f30
Issue-ID: TEST-158
Signed-off-by: DR695H <dr695h@att.com>
Diffstat (limited to 'robotframework-onap/ONAPLibrary/KafkaKeywords.py')
-rw-r--r-- | robotframework-onap/ONAPLibrary/KafkaKeywords.py | 58 |
1 files changed, 41 insertions, 17 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index 6da5b52..41ef976 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from pykafka import KafkaClient -from pykafka.common import OffsetType + +from kafka import KafkaConsumer +from kafka import KafkaProducer +import ssl from robot.api.deco import keyword from robot import utils @@ -25,9 +27,16 @@ class KafkaKeywords(object): self._cache = utils.ConnectionCache('No Kafka Environments created') @keyword - def connect(self, alias, kafka_host, kafka_version="1.0.0"): + def connect(self, alias, kafka_host, sasl_user, sasl_password): """connect to the specified kafka server""" - client = KafkaClient(hosts=kafka_host, broker_version=kafka_version) + client = { + "bootstrap_servers": kafka_host, + "sasl_plain_username": sasl_user, + "sasl_plain_password": sasl_password, + "security_protocol": 'SASL_SSL', + "ssl_context": ssl.create_default_context(), + "sasl_mechanism": 'PLAIN' + } self._cache.register(client, alias=alias) @keyword @@ -35,12 +44,18 @@ class KafkaKeywords(object): assert topic assert value - producer = self._get_producer(alias, topic) - return producer.produce(value, key) + producer = self._get_producer(alias) + return producer.send(topic, value=value, key=key) - def _get_producer(self, alias, topic_name): - topic = self._cache.switch(alias).topics[topic_name] - prod = topic.get_sync_producer() + def _get_producer(self, alias): + cache = self._cache.switch(alias) + prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + request_timeout_ms=5000) return prod @keyword @@ -48,7 +63,8 @@ class KafkaKeywords(object): assert topic_name consumer = self._get_consumer(alias, topic_name, consumer_group) - msg = consumer.consume() + msg = next(consumer) + consumer.close(autocommit=True) if msg is None: return None else: @@ -63,14 +79,22 @@ class KafkaKeywords(object): else: cgn = topic_name - topic = self._cache.switch(alias).topics[topic_name] + cache = self._cache.switch(alias) + + consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + group_id=cgn, + request_timeout_ms=5000) - offset_type = OffsetType.LATEST if set_offset_to_earliest: - offset_type = OffsetType.EARLIEST + consumer.seek_to_beginning() + else: + consumer.seek_to_end() - c = topic.get_simple_consumer( - consumer_group=cgn, auto_offset_reset=offset_type, auto_commit_enable=True, - reset_offset_on_start=True, consumer_timeout_ms=5000) + consumer.topics() - return c + return consumer |