summaryrefslogtreecommitdiffstats
path: root/csit/resources/tests/kafka_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'csit/resources/tests/kafka_consumer.py')
-rwxr-xr-xcsit/resources/tests/kafka_consumer.py42
1 files changed, 18 insertions, 24 deletions
diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py
index fa173f47..53a2e39f 100755
--- a/csit/resources/tests/kafka_consumer.py
+++ b/csit/resources/tests/kafka_consumer.py
@@ -27,34 +27,28 @@ import sys
import time
-def consume_kafka_topic(topic, expected_values, timeout, bootstrap_server):
+def consume_kafka_topic(topic, expected_msg, sec_timeout, bootstrap_server):
config = {
- 'bootstrap.servers': bootstrap_server,
- 'group.id': 'testgrp',
- 'auto.offset.reset': 'earliest'
+ 'bootstrap.servers': bootstrap_server,
+ 'group.id': 'testgrp',
+ 'auto.offset.reset': 'earliest'
}
consumer = Consumer(config)
consumer.subscribe([topic])
try:
start_time = time.time()
- while time.time() - start_time < timeout:
- msg = consumer.poll(1.0)
- if msg is None:
- continue
- if msg.error():
- if msg.error().code() == KafkaException._PARTITION_EOF:
- sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
- print('ERROR')
- sys.exit(404)
- else:
- # Error
- raise KafkaException(msg.error())
- else:
- # Message received
- message = msg.value().decode('utf-8')
- if expected_values in message:
- print(message)
- sys.exit(200)
+ while time.time() - start_time < sec_timeout:
+ msg = consumer.poll(1.0)
+ if msg is None:
+ continue
+ if msg.error():
+ raise KafkaException(msg.error())
+ else:
+ # Message received
+ message = msg.value().decode('utf-8')
+ if expected_msg in message:
+ print(message)
+ sys.exit(200)
finally:
consumer.close()
@@ -63,5 +57,5 @@ if __name__ == '__main__':
topic_name = sys.argv[1]
timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic
expected_values = sys.argv[3]
- bootstrap_server = sys.argv[4]
- consume_kafka_topic(topic_name, expected_values, timeout, bootstrap_server)
+ server = sys.argv[4]
+ consume_kafka_topic(topic_name, expected_values, timeout, server)