aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2023-12-14 14:17:35 +0000
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2023-12-14 14:17:35 +0000
commitf2609a349565f74237024f2f546e03ea5c772cb5 (patch)
treeacdd9010ce27231dae1e42aed03472d173855799
parentdae767c98f1f15da96d543f029c846d95a5de76a (diff)
Add kafka docker container for policy CSITs
Replaced dmaap with kafka in CLAMP docker tests. Issue-ID: POLICY-4201 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech> Change-Id: I4d05e24d3ececf2253ebc39785882be00bf9eaf4
-rwxr-xr-xcompose/config/clamp/A1pmsParticipantParameters.yaml12
-rw-r--r--compose/config/clamp/AcRuntimeParameters.yaml12
-rw-r--r--compose/config/clamp/HttpParticipantParameters.yaml12
-rwxr-xr-xcompose/config/clamp/KserveParticipantParameters.yaml12
-rw-r--r--compose/config/clamp/KubernetesParticipantParameters.yaml12
-rw-r--r--compose/config/clamp/PolicyParticipantParameters.yaml12
-rw-r--r--compose/config/clamp/SimulatorParticipantParameters.yaml12
-rw-r--r--compose/docker-compose.yml52
-rwxr-xr-xcsit/resources/scripts/kafka_consumer.py71
-rwxr-xr-xcsit/resources/scripts/kafka_producer.py41
-rwxr-xr-xcsit/resources/scripts/make_topics.py41
-rwxr-xr-xcsit/resources/scripts/prepare-robot-env.sh4
-rw-r--r--csit/resources/tests/common-library.robot10
13 files changed, 247 insertions, 56 deletions
diff --git a/compose/config/clamp/A1pmsParticipantParameters.yaml b/compose/config/clamp/A1pmsParticipantParameters.yaml
index f905484a..44902d89 100755
--- a/compose/config/clamp/A1pmsParticipantParameters.yaml
+++ b/compose/config/clamp/A1pmsParticipantParameters.yaml
@@ -28,16 +28,16 @@ participant:
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
clampAutomationCompositionTopics:
topicSources:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
topicSinks:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
management:
diff --git a/compose/config/clamp/AcRuntimeParameters.yaml b/compose/config/clamp/AcRuntimeParameters.yaml
index dc9a9846..b800e4b0 100644
--- a/compose/config/clamp/AcRuntimeParameters.yaml
+++ b/compose/config/clamp/AcRuntimeParameters.yaml
@@ -46,18 +46,18 @@ runtime:
topicParameterGroup:
topicSources:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
useHttps: false
topicSinks:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
useHttps: false
acmParameters:
toscaElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
diff --git a/compose/config/clamp/HttpParticipantParameters.yaml b/compose/config/clamp/HttpParticipantParameters.yaml
index 878eed75..a26a81d0 100644
--- a/compose/config/clamp/HttpParticipantParameters.yaml
+++ b/compose/config/clamp/HttpParticipantParameters.yaml
@@ -17,17 +17,17 @@ participant:
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
clampAutomationCompositionTopics:
topicSources:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
useHttps: false
topicSinks:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
useHttps: false
participantSupportedElementTypes:
- typeName: org.onap.policy.clamp.acm.HttpAutomationCompositionElement
diff --git a/compose/config/clamp/KserveParticipantParameters.yaml b/compose/config/clamp/KserveParticipantParameters.yaml
index d83a48f0..fe55f543 100755
--- a/compose/config/clamp/KserveParticipantParameters.yaml
+++ b/compose/config/clamp/KserveParticipantParameters.yaml
@@ -27,16 +27,16 @@ participant:
participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
clampAutomationCompositionTopics:
topicSources:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
topicSinks:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
participantSupportedElementTypes:
-
typeName: org.onap.policy.clamp.acm.KserveAutomationCompositionElement
diff --git a/compose/config/clamp/KubernetesParticipantParameters.yaml b/compose/config/clamp/KubernetesParticipantParameters.yaml
index 3449a5d1..449e9617 100644
--- a/compose/config/clamp/KubernetesParticipantParameters.yaml
+++ b/compose/config/clamp/KubernetesParticipantParameters.yaml
@@ -21,18 +21,18 @@ participant:
clampAutomationCompositionTopics:
topicSources:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
useHttps: false
topicSinks:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
useHttps: false
participantSupportedElementTypes:
- typeName: org.onap.policy.clamp.acm.K8SMicroserviceAutomationCompositionElement
diff --git a/compose/config/clamp/PolicyParticipantParameters.yaml b/compose/config/clamp/PolicyParticipantParameters.yaml
index 443a0cae..307e0b60 100644
--- a/compose/config/clamp/PolicyParticipantParameters.yaml
+++ b/compose/config/clamp/PolicyParticipantParameters.yaml
@@ -34,18 +34,18 @@ participant:
clampAutomationCompositionTopics:
topicSources:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
useHttps: false
topicSinks:
-
- topic: POLICY-ACRUNTIME-PARTICIPANT
+ topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
useHttps: false
participantSupportedElementTypes:
- typeName: org.onap.policy.clamp.acm.PolicyAutomationCompositionElement
diff --git a/compose/config/clamp/SimulatorParticipantParameters.yaml b/compose/config/clamp/SimulatorParticipantParameters.yaml
index 6ec594f2..16234e17 100644
--- a/compose/config/clamp/SimulatorParticipantParameters.yaml
+++ b/compose/config/clamp/SimulatorParticipantParameters.yaml
@@ -19,17 +19,17 @@ participant:
participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90}
clampAutomationCompositionTopics:
topicSources:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
fetchTimeout: 15000
useHttps: false
topicSinks:
- - topic: POLICY-ACRUNTIME-PARTICIPANT
+ - topic: policy-acruntime-participant
servers:
- - ${topicServer:message-router}
- topicCommInfrastructure: dmaap
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: kafka
useHttps: false
participantSupportedElementTypes:
-
diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml
index f57dac80..3814f215 100644
--- a/compose/docker-compose.yml
+++ b/compose/docker-compose.yml
@@ -241,7 +241,7 @@ services:
container_name: policy-clamp-runtime-acm
depends_on:
- mariadb
- - simulator
+ - kafka
- policy-clamp-ac-http-ppnt
- policy-clamp-ac-k8s-ppnt
- policy-clamp-ac-pf-ppnt
@@ -258,7 +258,7 @@ services:
command: [
'-c', './acm-runtime.sh',
'mariadb', '3306',
- 'message-router', '3904',
+ 'kafka', '9092',
'policy-clamp-ac-http-ppnt', '6969',
'policy-clamp-ac-k8s-ppnt', '6969',
'policy-clamp-ac-pf-ppnt', '6969',
@@ -269,7 +269,7 @@ services:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-http-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-http-ppnt
depends_on:
- - simulator
+ - kafka
hostname: policy-clamp-ac-http-ppnt
ports:
- "30290:6969"
@@ -280,13 +280,13 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './http-participant.sh',
- 'message-router', '3904'
+ 'kafka', '9092'
]
policy-clamp-ac-k8s-ppnt:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-k8s-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-k8s-ppnt
depends_on:
- - simulator
+ - kafka
hostname: policy-clamp-ac-k8s-ppnt
ports:
- "30295:6969"
@@ -297,13 +297,13 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './kubernetes-participant.sh',
- 'message-router', '3904'
+ 'kafka', '9092'
]
policy-clamp-ac-pf-ppnt:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-pf-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-pf-ppnt
depends_on:
- - simulator
+ - kafka
- api
hostname: policy-clamp-ac-pf-ppnt
ports:
@@ -315,14 +315,14 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './policy-participant.sh',
- 'message-router', '3904',
+ 'kafka', '9092',
'api', '6969'
]
policy-clamp-ac-a1pms-ppnt:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-a1pms-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-a1pms-ppnt
depends_on:
- - simulator
+ - kafka
hostname: policy-clamp-ac-a1pms-ppnt
ports:
- "30296:6969"
@@ -333,13 +333,13 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './a1pms-participant.sh',
- 'message-router', '3904'
+ 'kafka', '9092'
]
policy-clamp-ac-kserve-ppnt:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-kserve-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-kserve-ppnt
depends_on:
- - simulator
+ - kafka
hostname: policy-clamp-ac-kserve-ppnt
ports:
- "30297:6969"
@@ -350,13 +350,13 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './kserve-participant.sh',
- 'message-router', '3904'
+ 'kafka', '9092'
]
policy-clamp-ac-sim-ppnt:
image: ${CONTAINER_LOCATION}onap/policy-clamp-ac-sim-ppnt:${POLICY_CLAMP_VERSION}
container_name: policy-clamp-ac-sim-ppnt
depends_on:
- - simulator
+ - kafka
hostname: policy-clamp-ac-sim-ppnt
ports:
- ${SIM_PARTICIPANT_PORT}:6969
@@ -367,7 +367,7 @@ services:
entrypoint: /opt/app/policy/bin/wait_for_port.sh
command: [
'-c', './sim-participant.sh',
- 'message-router', '3904'
+ 'kafka', '9092'
]
prometheus:
image: nexus3.onap.org:10001/prom/prometheus:latest
@@ -389,3 +389,27 @@ services:
- ./metrics/dashboard.yaml:/etc/grafana/provisioning/dashboards/dashboard.yaml
- ./metrics/datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml
- ./metrics/dashboards:/var/lib/grafana/dashboards
+
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2181:2181
+
+ kafka:
+ image: confluentinc/cp-kafka:latest
+ container_name: kafka
+ depends_on:
+ - zookeeper
+ ports:
+ - 29092:29092
+ - 9092:9092
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file
diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py
new file mode 100755
index 00000000..80b6167a
--- /dev/null
+++ b/csit/resources/scripts/kafka_consumer.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to fetch kafka topic and look for required messages.
+# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
+
+
+from confluent_kafka import Consumer, KafkaException
+import sys
+import time
+
+def consume_kafka_topic(topic, expected_values, timeout):
+ config = {
+ 'bootstrap.servers': 'localhost:29092',
+ 'group.id': 'testgrp',
+ 'auto.offset.reset': 'earliest'
+ }
+ consumer = Consumer(config)
+ consumer.subscribe([topic])
+ try:
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ msg = consumer.poll(1.0)
+ if msg is None:
+ continue
+ if msg.error():
+ if msg.error().code() == KafkaException._PARTITION_EOF:
+ sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
+ print('ERROR')
+ sys.exit(404)
+ else:
+ # Error
+ raise KafkaException(msg.error())
+ else:
+ # Message received
+ message = msg.value().decode('utf-8')
+ if verify_msg(expected_values, message):
+ print(message)
+ sys.exit(200)
+ finally:
+ consumer.close()
+
+def verify_msg(expected_values, message):
+ for item in expected_values:
+ if item not in message:
+ return False
+ return True
+
+
+if __name__ == '__main__':
+ topic_name = sys.argv[1]
+ timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic
+ expected_values = sys.argv[3:]
+ consume_kafka_topic(topic_name, expected_values, timeout)
diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py
new file mode 100755
index 00000000..ff129872
--- /dev/null
+++ b/csit/resources/scripts/kafka_producer.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to produce a message on a kafka topic
+# Accepts the arguments {topic_name} and {message}
+
+from confluent_kafka import Producer
+import sys
+
+def post_to_kafka(topic, message):
+ conf = {'bootstrap.servers': 'localhost:29092'}
+
+ producer = Producer(conf)
+ try:
+ producer.produce(topic, value=message.encode('utf-8'))
+ producer.flush()
+ print('Message posted to Kafka topic: {}'.format(topic))
+ except Exception as e:
+ print('Failed to post message: {}'.format(str(e)))
+ finally:
+ producer.flush()
+
+if __name__ == '__main__':
+ post_to_kafka(sys.argv[1], sys.argv[2])
diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py
new file mode 100755
index 00000000..daee4341
--- /dev/null
+++ b/csit/resources/scripts/make_topics.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+# Copyright (C) 2023 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to create a new kafka topic
+# Accepts the argument {topic_name}
+
+from confluent_kafka.admin import AdminClient, NewTopic
+import sys
+
+def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2):
+ admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
+
+ # Define the topic configuration
+ topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
+
+ # Create the topic
+ admin_client.create_topics([topic])
+
+
+if __name__ == '__main__':
+ topic_name = sys.argv[1]
+ bootstrap_servers = 'localhost:29092'
+
+ create_topic(bootstrap_servers, topic_name)
diff --git a/csit/resources/scripts/prepare-robot-env.sh b/csit/resources/scripts/prepare-robot-env.sh
index 2281d235..2b773802 100755
--- a/csit/resources/scripts/prepare-robot-env.sh
+++ b/csit/resources/scripts/prepare-robot-env.sh
@@ -44,6 +44,10 @@ mkdir -p "${ROBOT_VENV}"/src/onap
rm -rf "${ROBOT_VENV}"/src/onap/testsuite
python3 -m pip install -qq --upgrade --extra-index-url="https://nexus3.onap.org/repository/PyPi.staging/simple" 'robotframework-onap==0.6.0.*' --pre
+# install confluent-kafka
+echo "Installing python confluent-kafka library"
+python3 -m pip install -qq confluent-kafka
+
echo "Uninstall docker-py and reinstall docker."
python3 -m pip uninstall -y -qq docker
python3 -m pip install -U -qq docker
diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot
index 8c279176..f5db8e0e 100644
--- a/csit/resources/tests/common-library.robot
+++ b/csit/resources/tests/common-library.robot
@@ -147,3 +147,13 @@ CheckTopic
Status Should Be OK ${resp}
Should Contain ${resp.text} ${expected_status}
[Return] ${resp.text}
+
+CheckKafkaTopic
+ [Arguments] ${topic} ${expected_status}
+ ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status}
+ Log to console Received response from kafka ${resp.stdout}
+ Should Contain ${resp.text} ${expected_status}
+
+GetKafkaTopic
+ [Arguments] ${topic}
+ ${resp}= Run Process ${CURDIR}/make_topics.py ${topic} \ No newline at end of file