diff options
Diffstat (limited to 'csit/resources/tests/kafka_consumer.py')
-rwxr-xr-x | csit/resources/tests/kafka_consumer.py | 42 |
1 files changed, 18 insertions, 24 deletions
diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py index fa173f47..53a2e39f 100755 --- a/csit/resources/tests/kafka_consumer.py +++ b/csit/resources/tests/kafka_consumer.py @@ -27,34 +27,28 @@ import sys import time -def consume_kafka_topic(topic, expected_values, timeout, bootstrap_server): +def consume_kafka_topic(topic, expected_msg, sec_timeout, bootstrap_server): config = { - 'bootstrap.servers': bootstrap_server, - 'group.id': 'testgrp', - 'auto.offset.reset': 'earliest' + 'bootstrap.servers': bootstrap_server, + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' } consumer = Consumer(config) consumer.subscribe([topic]) try: start_time = time.time() - while time.time() - start_time < timeout: - msg = consumer.poll(1.0) - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaException._PARTITION_EOF: - sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") - print('ERROR') - sys.exit(404) - else: - # Error - raise KafkaException(msg.error()) - else: - # Message received - message = msg.value().decode('utf-8') - if expected_values in message: - print(message) - sys.exit(200) + while time.time() - start_time < sec_timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if expected_msg in message: + print(message) + sys.exit(200) finally: consumer.close() @@ -63,5 +57,5 @@ if __name__ == '__main__': topic_name = sys.argv[1] timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic expected_values = sys.argv[3] - bootstrap_server = sys.argv[4] - consume_kafka_topic(topic_name, expected_values, timeout, bootstrap_server) + server = sys.argv[4] + consume_kafka_topic(topic_name, expected_values, timeout, server) |