summaryrefslogtreecommitdiffstats
path: root/csit
diff options
context:
space:
mode:
Diffstat (limited to 'csit')
-rwxr-xr-xcsit/resources/scripts/kafka_consumer.py71
-rwxr-xr-xcsit/resources/scripts/kafka_producer.py41
-rwxr-xr-xcsit/resources/scripts/make_topics.py41
-rwxr-xr-xcsit/resources/scripts/prepare-robot-env.sh4
-rw-r--r--csit/resources/tests/common-library.robot10
5 files changed, 167 insertions, 0 deletions
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py
new file mode 100755
index 00000000..80b6167a
--- /dev/null
+++ b/csit/resources/scripts/kafka_consumer.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to fetch kafka topic and look for required messages.
+# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
+
+
+from confluent_kafka import Consumer, KafkaException
+import sys
+import time
+
+def consume_kafka_topic(topic, expected_values, timeout):
+ config = {
+ 'bootstrap.servers': 'localhost:29092',
+ 'group.id': 'testgrp',
+ 'auto.offset.reset': 'earliest'
+ }
+ consumer = Consumer(config)
+ consumer.subscribe([topic])
+ try:
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ msg = consumer.poll(1.0)
+ if msg is None:
+ continue
+ if msg.error():
+ if msg.error().code() == KafkaException._PARTITION_EOF:
+ sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
+ print('ERROR')
+ sys.exit(404)
+ else:
+ # Error
+ raise KafkaException(msg.error())
+ else:
+ # Message received
+ message = msg.value().decode('utf-8')
+ if verify_msg(expected_values, message):
+ print(message)
+ sys.exit(200)
+ finally:
+ consumer.close()
+
+def verify_msg(expected_values, message):
+ for item in expected_values:
+ if item not in message:
+ return False
+ return True
+
+
+if __name__ == '__main__':
+ topic_name = sys.argv[1]
+ timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic
+ expected_values = sys.argv[3:]
+ consume_kafka_topic(topic_name, expected_values, timeout)
diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py
new file mode 100755
index 00000000..ff129872
--- /dev/null
+++ b/csit/resources/scripts/kafka_producer.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to produce a message on a kafka topic
+# Accepts the arguments {topic_name} and {message}
+
+from confluent_kafka import Producer
+import sys
+
+def post_to_kafka(topic, message):
+ conf = {'bootstrap.servers': 'localhost:29092'}
+
+ producer = Producer(conf)
+ try:
+ producer.produce(topic, value=message.encode('utf-8'))
+ producer.flush()
+ print('Message posted to Kafka topic: {}'.format(topic))
+ except Exception as e:
+ print('Failed to post message: {}'.format(str(e)))
+ finally:
+ producer.flush()
+
+if __name__ == '__main__':
+ post_to_kafka(sys.argv[1], sys.argv[2])
diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py
new file mode 100755
index 00000000..daee4341
--- /dev/null
+++ b/csit/resources/scripts/make_topics.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to create a new kafka topic
+# Accepts the argument {topic_name}
+
+from confluent_kafka.admin import AdminClient, NewTopic
+import sys
+
+def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2):
+ admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
+
+ # Define the topic configuration
+ topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
+
+ # Create the topic
+ admin_client.create_topics([topic])
+
+
+if __name__ == '__main__':
+ topic_name = sys.argv[1]
+ bootstrap_servers = 'localhost:29092'
+
+ create_topic(bootstrap_servers, topic_name)
diff --git a/csit/resources/scripts/prepare-robot-env.sh b/csit/resources/scripts/prepare-robot-env.sh
index 2281d235..2b773802 100755
--- a/csit/resources/scripts/prepare-robot-env.sh
+++ b/csit/resources/scripts/prepare-robot-env.sh
@@ -44,6 +44,10 @@ mkdir -p "${ROBOT_VENV}"/src/onap
rm -rf "${ROBOT_VENV}"/src/onap/testsuite
python3 -m pip install -qq --upgrade --extra-index-url="https://nexus3.onap.org/repository/PyPi.staging/simple" 'robotframework-onap==0.6.0.*' --pre
+# install confluent-kafka
+echo "Installing python confluent-kafka library"
+python3 -m pip install -qq confluent-kafka
+
echo "Uninstall docker-py and reinstall docker."
python3 -m pip uninstall -y -qq docker
python3 -m pip install -U -qq docker
diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot
index 8c279176..f5db8e0e 100644
--- a/csit/resources/tests/common-library.robot
+++ b/csit/resources/tests/common-library.robot
@@ -147,3 +147,13 @@ CheckTopic
Status Should Be OK ${resp}
Should Contain ${resp.text} ${expected_status}
[Return] ${resp.text}
+
+CheckKafkaTopic
+ [Arguments] ${topic} ${expected_status}
+ ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status}
+ Log to console Received response from kafka ${resp.stdout}
+ Should Contain ${resp.text} ${expected_status}
+
+GetKafkaTopic
+ [Arguments] ${topic}
+ ${resp}= Run Process ${CURDIR}/make_topics.py ${topic} \ No newline at end of file