diff options
Diffstat (limited to 'robotframework-onap/ONAPLibrary/KafkaKeywords.py')
-rw-r--r-- | robotframework-onap/ONAPLibrary/KafkaKeywords.py | 58 |
1 files changed, 41 insertions, 17 deletions
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py index 6da5b52..41ef976 100644 --- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py +++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py @@ -11,8 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from pykafka import KafkaClient -from pykafka.common import OffsetType + +from kafka import KafkaConsumer +from kafka import KafkaProducer +import ssl from robot.api.deco import keyword from robot import utils @@ -25,9 +27,16 @@ class KafkaKeywords(object): self._cache = utils.ConnectionCache('No Kafka Environments created') @keyword - def connect(self, alias, kafka_host, kafka_version="1.0.0"): + def connect(self, alias, kafka_host, sasl_user, sasl_password): """connect to the specified kafka server""" - client = KafkaClient(hosts=kafka_host, broker_version=kafka_version) + client = { + "bootstrap_servers": kafka_host, + "sasl_plain_username": sasl_user, + "sasl_plain_password": sasl_password, + "security_protocol": 'SASL_SSL', + "ssl_context": ssl.create_default_context(), + "sasl_mechanism": 'PLAIN' + } self._cache.register(client, alias=alias) @keyword @@ -35,12 +44,18 @@ class KafkaKeywords(object): assert topic assert value - producer = self._get_producer(alias, topic) - return producer.produce(value, key) + producer = self._get_producer(alias) + return producer.send(topic, value=value, key=key) - def _get_producer(self, alias, topic_name): - topic = self._cache.switch(alias).topics[topic_name] - prod = topic.get_sync_producer() + def _get_producer(self, alias): + cache = self._cache.switch(alias) + prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + request_timeout_ms=5000) return prod @keyword @@ -48,7 +63,8 @@ class KafkaKeywords(object): assert topic_name consumer = self._get_consumer(alias, topic_name, consumer_group) - msg = consumer.consume() + msg = next(consumer) + consumer.close(autocommit=True) if msg is None: return None else: @@ -63,14 +79,22 @@ class KafkaKeywords(object): else: cgn = topic_name - topic = self._cache.switch(alias).topics[topic_name] + cache = self._cache.switch(alias) + + consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'], + sasl_plain_username=cache['sasl_plain_username'], + sasl_plain_password=cache['sasl_password'], + security_protocol=cache['security_protocol'], + ssl_context=cache['ssl_context'], + sasl_mechanism=cache['sasl_mechanism'], + group_id=cgn, + request_timeout_ms=5000) - offset_type = OffsetType.LATEST if set_offset_to_earliest: - offset_type = OffsetType.EARLIEST + consumer.seek_to_beginning() + else: + consumer.seek_to_end() - c = topic.get_simple_consumer( - consumer_group=cgn, auto_offset_reset=offset_type, auto_commit_enable=True, - reset_offset_on_start=True, consumer_timeout_ms=5000) + consumer.topics() - return c + return consumer |