diff options
Diffstat (limited to 'csit')
-rwxr-xr-x | csit/resources/scripts/kafka_consumer.py | 71 | ||||
-rwxr-xr-x | csit/resources/scripts/kafka_producer.py | 41 | ||||
-rwxr-xr-x | csit/resources/scripts/make_topics.py | 41 | ||||
-rwxr-xr-x | csit/resources/scripts/prepare-robot-env.sh | 4 | ||||
-rw-r--r-- | csit/resources/tests/common-library.robot | 10 |
5 files changed, 167 insertions, 0 deletions
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py new file mode 100755 index 00000000..80b6167a --- /dev/null +++ b/csit/resources/scripts/kafka_consumer.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to fetch kafka topic and look for required messages. +# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. + + +from confluent_kafka import Consumer, KafkaException +import sys +import time + +def consume_kafka_topic(topic, expected_values, timeout): + config = { + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' + } + consumer = Consumer(config) + consumer.subscribe([topic]) + try: + start_time = time.time() + while time.time() - start_time < timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") + print('ERROR') + sys.exit(404) + else: + # Error + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if verify_msg(expected_values, message): + print(message) + sys.exit(200) + finally: + consumer.close() + +def verify_msg(expected_values, message): + for item in expected_values: + if item not in message: + return False + return True + + +if __name__ == '__main__': + topic_name = sys.argv[1] + timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic + expected_values = sys.argv[3:] + consume_kafka_topic(topic_name, expected_values, timeout) diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py new file mode 100755 index 00000000..ff129872 --- /dev/null +++ b/csit/resources/scripts/kafka_producer.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to produce a message on a kafka topic +# Accepts the arguments {topic_name} and {message} + +from confluent_kafka import Producer +import sys + +def post_to_kafka(topic, message): + conf = {'bootstrap.servers': 'localhost:29092'} + + producer = Producer(conf) + try: + producer.produce(topic, value=message.encode('utf-8')) + producer.flush() + print('Message posted to Kafka topic: {}'.format(topic)) + except Exception as e: + print('Failed to post message: {}'.format(str(e))) + finally: + producer.flush() + +if __name__ == '__main__': + post_to_kafka(sys.argv[1], sys.argv[2]) diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py new file mode 100755 index 00000000..daee4341 --- /dev/null +++ b/csit/resources/scripts/make_topics.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to create a new kafka topic +# Accepts the argument {topic_name} + +from confluent_kafka.admin import AdminClient, NewTopic +import sys + +def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2): + admin_client = AdminClient({'bootstrap.servers': bootstrap_servers}) + + # Define the topic configuration + topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) + + # Create the topic + admin_client.create_topics([topic]) + + +if __name__ == '__main__': + topic_name = sys.argv[1] + bootstrap_servers = 'localhost:29092' + + create_topic(bootstrap_servers, topic_name) diff --git a/csit/resources/scripts/prepare-robot-env.sh b/csit/resources/scripts/prepare-robot-env.sh index 2281d235..2b773802 100755 --- a/csit/resources/scripts/prepare-robot-env.sh +++ b/csit/resources/scripts/prepare-robot-env.sh @@ -44,6 +44,10 @@ mkdir -p "${ROBOT_VENV}"/src/onap rm -rf "${ROBOT_VENV}"/src/onap/testsuite python3 -m pip install -qq --upgrade --extra-index-url="https://nexus3.onap.org/repository/PyPi.staging/simple" 'robotframework-onap==0.6.0.*' --pre +# install confluent-kafka +echo "Installing python confluent-kafka library" +python3 -m pip install -qq confluent-kafka + echo "Uninstall docker-py and reinstall docker." python3 -m pip uninstall -y -qq docker python3 -m pip install -U -qq docker diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot index 8c279176..f5db8e0e 100644 --- a/csit/resources/tests/common-library.robot +++ b/csit/resources/tests/common-library.robot @@ -147,3 +147,13 @@ CheckTopic Status Should Be OK ${resp} Should Contain ${resp.text} ${expected_status} [Return] ${resp.text} + +CheckKafkaTopic + [Arguments] ${topic} ${expected_status} + ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status} + Log to console Received response from kafka ${resp.stdout} + Should Contain ${resp.text} ${expected_status} + +GetKafkaTopic + [Arguments] ${topic} + ${resp}= Run Process ${CURDIR}/make_topics.py ${topic}
\ No newline at end of file |