diff options
Diffstat (limited to 'csit/resources/scripts/kafka_consumer.py')
-rwxr-xr-x | csit/resources/scripts/kafka_consumer.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py new file mode 100755 index 00000000..80b6167a --- /dev/null +++ b/csit/resources/scripts/kafka_consumer.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to fetch kafka topic and look for required messages. +# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. + + +from confluent_kafka import Consumer, KafkaException +import sys +import time + +def consume_kafka_topic(topic, expected_values, timeout): + config = { + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' + } + consumer = Consumer(config) + consumer.subscribe([topic]) + try: + start_time = time.time() + while time.time() - start_time < timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") + print('ERROR') + sys.exit(404) + else: + # Error + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if verify_msg(expected_values, message): + print(message) + sys.exit(200) + finally: + consumer.close() + +def verify_msg(expected_values, message): + for item in expected_values: + if item not in message: + return False + return True + + +if __name__ == '__main__': + topic_name = sys.argv[1] + timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic + expected_values = sys.argv[3:] + consume_kafka_topic(topic_name, expected_values, timeout) |