summaryrefslogtreecommitdiffstats
path: root/csit/resources/scripts/kafka_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'csit/resources/scripts/kafka_consumer.py')
-rwxr-xr-xcsit/resources/scripts/kafka_consumer.py71
1 files changed, 71 insertions, 0 deletions
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py
new file mode 100755
index 00000000..80b6167a
--- /dev/null
+++ b/csit/resources/scripts/kafka_consumer.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to fetch kafka topic and look for required messages.
+# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
+
+
+from confluent_kafka import Consumer, KafkaException
+import sys
+import time
+
+def consume_kafka_topic(topic, expected_values, timeout):
+ config = {
+ 'bootstrap.servers': 'localhost:29092',
+ 'group.id': 'testgrp',
+ 'auto.offset.reset': 'earliest'
+ }
+ consumer = Consumer(config)
+ consumer.subscribe([topic])
+ try:
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ msg = consumer.poll(1.0)
+ if msg is None:
+ continue
+ if msg.error():
+ if msg.error().code() == KafkaException._PARTITION_EOF:
+ sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
+ print('ERROR')
+ sys.exit(404)
+ else:
+ # Error
+ raise KafkaException(msg.error())
+ else:
+ # Message received
+ message = msg.value().decode('utf-8')
+ if verify_msg(expected_values, message):
+ print(message)
+ sys.exit(200)
+ finally:
+ consumer.close()
+
+def verify_msg(expected_values, message):
+ for item in expected_values:
+ if item not in message:
+ return False
+ return True
+
+
+if __name__ == '__main__':
+ topic_name = sys.argv[1]
+ timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic
+ expected_values = sys.argv[3:]
+ consume_kafka_topic(topic_name, expected_values, timeout)