summaryrefslogtreecommitdiffstats
path: root/csit/resources/tests
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-01-21 14:24:03 +0000
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-01-22 14:55:45 +0000
commit31c61d495474985b8cc3460464f888651d0919ed (patch)
tree72ac562f9bd64196fda57edd8ca6a62f8877b046 /csit/resources/tests
parentcaa7adc30ed054d2a5cfea4a1b9a265d5cfb6785 (diff)
Add kafka support in K8s CSIT
Issue-ID: POLICY-4402 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech> Change-Id: I29ef966ed5eb70997574269ff6180f68d754383b
Diffstat (limited to 'csit/resources/tests')
-rw-r--r--csit/resources/tests/apex-pdp-test.robot4
-rw-r--r--csit/resources/tests/apex-slas-10.robot4
-rw-r--r--csit/resources/tests/apex-slas-3.robot4
-rw-r--r--csit/resources/tests/apex-slas.robot4
-rw-r--r--csit/resources/tests/common-library.robot4
-rw-r--r--csit/resources/tests/data/AcK8s.json6
-rw-r--r--csit/resources/tests/data/acelement-usecase.yaml45
-rw-r--r--csit/resources/tests/drools-applications-test.robot2
-rwxr-xr-xcsit/resources/tests/kafka_consumer.py7
-rwxr-xr-xcsit/resources/tests/kafka_producer.py6
-rwxr-xr-xcsit/resources/tests/make_topics.py2
-rw-r--r--csit/resources/tests/policy-clamp-test.robot3
12 files changed, 56 insertions, 35 deletions
diff --git a/csit/resources/tests/apex-pdp-test.robot b/csit/resources/tests/apex-pdp-test.robot
index 37bcff6f..42be4415 100644
--- a/csit/resources/tests/apex-pdp-test.robot
+++ b/csit/resources/tests/apex-pdp-test.robot
@@ -80,7 +80,7 @@ TriggerAndVerifyTestPnfPolicy
[Documentation] Send TestPnf policy trigger event to Kafka and read notifications to verify policy execution
[Arguments] ${topic}
${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${KAFKA_IP}
Run Keyword CheckLogMessage ${topic} ACTIVE VES event has been received. Going to fetch details from AAI.
Run Keyword CheckLogMessage ${topic} SUCCESS Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS.
Run Keyword CheckLogMessage ${topic} FINAL_SUCCESS Successfully processed the VES event. Hostname is updated.
@@ -89,7 +89,7 @@ TriggerAndVerifyTestVnfPolicy
[Documentation] Send TestVnf policy trigger event to Kafka and read notifications to verify policy execution
[Arguments] ${topic}
${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${KAFKA_IP}
Run Keyword CheckLogMessage ${topic} ACTIVE VES event has been received. Going to fetch VNF details from AAI.
Run Keyword CheckLogMessage ${topic} SUCCESS VNF details are received from AAI successfully. Sending ConfigModify request to CDS.
Run Keyword CheckLogMessage ${topic} SUCCESS ConfigModify request is successful. Sending restart request to CDS.
diff --git a/csit/resources/tests/apex-slas-10.robot b/csit/resources/tests/apex-slas-10.robot
index 833bb837..53779774 100644
--- a/csit/resources/tests/apex-slas-10.robot
+++ b/csit/resources/tests/apex-slas-10.robot
@@ -28,7 +28,7 @@ ValidatePolicyExecutionAndEventRateLowComplexity
GetKafkaTopic apex-cl-mgt2
${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 100
@@ -58,7 +58,7 @@ ValidatePolicyExecutionAndEventRateModerateComplexity
GetKafkaTopic apex-cl-mgt
${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 10
diff --git a/csit/resources/tests/apex-slas-3.robot b/csit/resources/tests/apex-slas-3.robot
index c37c1cda..048e2e76 100644
--- a/csit/resources/tests/apex-slas-3.robot
+++ b/csit/resources/tests/apex-slas-3.robot
@@ -26,7 +26,7 @@ ValidatePolicyExecutionAndEventRateLowComplexity
GetKafkaTopic apex-cl-mgt
${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 30
@@ -42,7 +42,7 @@ ValidatePolicyExecutionAndEventRateHighComplexity
GetKafkaTopic apex-cl-mgt2
${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 0.6
diff --git a/csit/resources/tests/apex-slas.robot b/csit/resources/tests/apex-slas.robot
index 4191bb2a..69b24521 100644
--- a/csit/resources/tests/apex-slas.robot
+++ b/csit/resources/tests/apex-slas.robot
@@ -26,7 +26,7 @@ ValidatePolicyExecutionAndEventRateLowComplexity
GetKafkaTopic apex-cl-mgt
${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 10
@@ -53,7 +53,7 @@ ValidatePolicyExecutionAndEventRateHighComplexity
GetKafkaTopic apex-cl-mgt2
${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json
${eventStartTime}= Get Current Date
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${KAFKA_IP}
${eventEndTime}= Get Current Date
ValidateEventExecution ${eventStartTime} ${eventEndTime} 0.2
diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot
index 8693be14..bc140897 100644
--- a/csit/resources/tests/common-library.robot
+++ b/csit/resources/tests/common-library.robot
@@ -133,10 +133,10 @@ ValidateResponseTime
CheckKafkaTopic
[Arguments] ${topic} ${expected_status}
- ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 60 ${expected_status}
+ ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 60 ${expected_status} ${KAFKA_IP}
Should Contain ${resp.stdout} ${expected_status}
[Return] ${resp.stdout}
GetKafkaTopic
[Arguments] ${topic}
- ${resp}= Run Process ${CURDIR}/make_topics.py ${topic} \ No newline at end of file
+ ${resp}= Run Process ${CURDIR}/make_topics.py ${topic} ${KAFKA_IP} \ No newline at end of file
diff --git a/csit/resources/tests/data/AcK8s.json b/csit/resources/tests/data/AcK8s.json
index f9ef5b0c..3a7e3a33 100644
--- a/csit/resources/tests/data/AcK8s.json
+++ b/csit/resources/tests/data/AcK8s.json
@@ -129,7 +129,7 @@
},
"httpMethod": "POST",
"path": "/onap/policy/clamp/acelement/v2/activate",
- "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.startertobridge\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"STARTER\", \"topicParameterGroup\": { \"server\": \"message-router:3904\", \"listenerTopic\": \"POLICY_UPDATE_MSG\", \"publisherTopic\": \"AC_ELEMENT_MSG\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"dmaap\" } }",
+ "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.startertobridge\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"STARTER\", \"topicParameterGroup\": { \"server\": \"kafka:9092\", \"listenerTopic\": \"policy_update_msg\", \"publisherTopic\": \"ac_element_msg\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"kafka\" } }",
"expectedResponse": 201
}
]
@@ -164,7 +164,7 @@
},
"httpMethod": "POST",
"path": "/onap/policy/clamp/acelement/v2/activate",
- "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.bridgetosink\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"BRIDGE\", \"topicParameterGroup\": { \"server\": \"message-router:3904\", \"listenerTopic\": \"POLICY_UPDATE_MSG\", \"publisherTopic\": \"AC_ELEMENT_MSG\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"dmaap\" } }",
+ "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.bridgetosink\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"BRIDGE\", \"topicParameterGroup\": { \"server\": \"kafka:9092\", \"listenerTopic\": \"policy_update_msg\", \"publisherTopic\": \"ac_element_msg\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"kafka\" } }",
"expectedResponse": 201
}
]
@@ -199,7 +199,7 @@
},
"httpMethod": "POST",
"path": "/onap/policy/clamp/acelement/v2/activate",
- "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.sink\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"SINK\", \"topicParameterGroup\": { \"server\": \"message-router\", \"listenerTopic\": \"POLICY_UPDATE_MSG\", \"publisherTopic\": \"AC_ELEMENT_MSG\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"dmaap\" } }",
+ "body": "{ \"receiverId\": { \"name\": \"onap.policy.clamp.ac.sink\", \"version\": \"1.0.0\" }, \"timerMs\": 20000, \"elementType\": \"SINK\", \"topicParameterGroup\": { \"server\": \"kafka:9092\", \"listenerTopic\": \"policy_update_msg\", \"publisherTopic\": \"ac_element_msg\", \"fetchTimeout\": 15000, \"topicCommInfrastructure\": \"kafka\" } }",
"expectedResponse": 201
}
]
diff --git a/csit/resources/tests/data/acelement-usecase.yaml b/csit/resources/tests/data/acelement-usecase.yaml
index 94deb943..937ed6e6 100644
--- a/csit/resources/tests/data/acelement-usecase.yaml
+++ b/csit/resources/tests/data/acelement-usecase.yaml
@@ -1084,14 +1084,24 @@ topology_template:
eventInputParameters:
DmaapConsumer:
carrierTechnologyParameters:
- carrierTechnology: RESTCLIENT
- parameterClassName: org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters
+ carrierTechnology: KAFKA
+ parameterClassName: org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters
parameters:
- url: http://message-router:3904/events/AC_ELEMENT_MSG/APEX/1?timeout=30000
+ bootstrapServers: kafka:9092
+ groupId: clamp-grp
+ enableAutoCommit: true
+ autoCommitTime: 1000
+ sessionTimeout: 30000
+ consumerPollTime: 100
+ consumerTopicList:
+ - ac_element_msg
+ keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
+ valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
+ kafkaProperties: [ ]
eventProtocolParameters:
- eventProtocol: JSON
- parameters:
- pojoField: DmaapResponseEvent
+ eventProtocol: JSON
+ parameters:
+ pojoField: DmaapResponseEvent
eventName: AcElementEvent
eventNameFilter: AcElementEvent
eventOutputParameters:
@@ -1104,14 +1114,23 @@ topology_template:
eventProtocol: JSON
DmaapReplyProducer:
carrierTechnologyParameters:
- carrierTechnology: RESTCLIENT
- parameterClassName: org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters
+ carrierTechnology: KAFKA
+ parameterClassName: org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters
parameters:
- url: http://message-router:3904/events/POLICY_UPDATE_MSG
+ bootstrapServers: kafka:9092
+ acks: all
+ retries: 0
+ batchSize: 16384
+ lingerTime: 1
+ bufferMemory: 33554432
+ producerTopic: policy_update_msg
+ keySerializer: org.apache.kafka.common.serialization.StringSerializer
+ valueSerializer: org.apache.kafka.common.serialization.StringSerializer
+ kafkaProperties: [ ]
eventProtocolParameters:
- eventProtocol: JSON
- parameters:
- pojoField: DmaapResponseStatusEvent
- eventNameFilter: (LogEvent|DmaapResponseStatusEvent)
+ eventProtocol: JSON
+ parameters:
+ pojoField: DmaapResponseStatusEvent
+ eventNameFilter: LogEvent|DmaapResponseStatusEvent
name: onap.policies.native.apex.ac.element
version: 1.0.0
diff --git a/csit/resources/tests/drools-applications-test.robot b/csit/resources/tests/drools-applications-test.robot
index 40497008..a43f1422 100644
--- a/csit/resources/tests/drools-applications-test.robot
+++ b/csit/resources/tests/drools-applications-test.robot
@@ -170,6 +170,6 @@ PerformPostRequest
OnSet
[Arguments] ${file}
${data}= Get File ${file}
- ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data}
+ ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${KAFKA_IP}
Log Response from kafka ${resp.stdout}
[Return] ${resp.stdout}
diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py
index 595e3db7..fa173f47 100755
--- a/csit/resources/tests/kafka_consumer.py
+++ b/csit/resources/tests/kafka_consumer.py
@@ -27,9 +27,9 @@ import sys
import time
-def consume_kafka_topic(topic, expected_values, timeout):
+def consume_kafka_topic(topic, expected_values, timeout, bootstrap_server):
config = {
- 'bootstrap.servers': 'localhost:29092',
+ 'bootstrap.servers': bootstrap_server,
'group.id': 'testgrp',
'auto.offset.reset': 'earliest'
}
@@ -63,4 +63,5 @@ if __name__ == '__main__':
topic_name = sys.argv[1]
timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic
expected_values = sys.argv[3]
- consume_kafka_topic(topic_name, expected_values, timeout)
+ bootstrap_server = sys.argv[4]
+ consume_kafka_topic(topic_name, expected_values, timeout, bootstrap_server)
diff --git a/csit/resources/tests/kafka_producer.py b/csit/resources/tests/kafka_producer.py
index e6f01c21..a7d45647 100755
--- a/csit/resources/tests/kafka_producer.py
+++ b/csit/resources/tests/kafka_producer.py
@@ -24,8 +24,8 @@
from confluent_kafka import Producer
import sys
-def post_to_kafka(topic, message):
- conf = {'bootstrap.servers': 'localhost:29092'}
+def post_to_kafka(topic, message, bootstrap_server):
+ conf = {'bootstrap.servers': bootstrap_server}
producer = Producer(conf)
try:
@@ -38,4 +38,4 @@ def post_to_kafka(topic, message):
producer.flush()
if __name__ == '__main__':
- post_to_kafka(sys.argv[1], sys.argv[2])
+ post_to_kafka(sys.argv[1], sys.argv[2], sys.argv[3])
diff --git a/csit/resources/tests/make_topics.py b/csit/resources/tests/make_topics.py
index 64a230eb..82b1dc39 100755
--- a/csit/resources/tests/make_topics.py
+++ b/csit/resources/tests/make_topics.py
@@ -36,6 +36,6 @@ def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_fa
if __name__ == '__main__':
topic_name = sys.argv[1]
- bootstrap_servers = 'localhost:29092'
+ bootstrap_servers = sys.argv[2]
create_topic(bootstrap_servers, topic_name)
diff --git a/csit/resources/tests/policy-clamp-test.robot b/csit/resources/tests/policy-clamp-test.robot
index 10f93893..123249a9 100644
--- a/csit/resources/tests/policy-clamp-test.robot
+++ b/csit/resources/tests/policy-clamp-test.robot
@@ -85,6 +85,7 @@ DeployAutomationComposition
QueryPolicies
[Documentation] Verify the new policies deployed
${auth}= Create List policyadmin zb!XztG34
+ Sleep 10s
Log Creating session http://${POLICY_PAP_IP}
${session}= Create Session policy http://${POLICY_PAP_IP} auth=${auth}
${headers}= Create Dictionary Accept=application/json Content-Type=application/json
@@ -97,7 +98,7 @@ QueryPolicies
QueryPolicyTypes
[Documentation] Verify the new policy types created
${auth}= Create List policyadmin zb!XztG34
- sleep 10
+ Sleep 10s
Log Creating session http://${POLICY_API_IP}}:6969
${session}= Create Session policy http://${POLICY_API_IP} auth=${auth}
${headers}= Create Dictionary Accept=application/json Content-Type=application/json