diff options
author | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2024-01-13 21:26:09 +0000 |
---|---|---|
committer | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2024-01-14 18:44:47 +0000 |
commit | f2e4da7e296548fb3980fd212e3a67dc83254e1d (patch) | |
tree | 6c86423e586a94509889ce34fa08560d277752c4 /csit/resources/tests | |
parent | b9d434aeef048c4ea2cf9bd8a27681d375ec5b85 (diff) |
Add kafka support in Policy CSIT
Issue-ID: POLICY-4402
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I802c19a3c9817d304164eba634adb8c119aa4ced
Diffstat (limited to 'csit/resources/tests')
-rw-r--r-- | csit/resources/tests/apex-pdp-common.robot | 4 | ||||
-rw-r--r-- | csit/resources/tests/apex-pdp-test.robot | 46 | ||||
-rw-r--r-- | csit/resources/tests/apex-slas.robot | 14 | ||||
-rw-r--r-- | csit/resources/tests/common-library.robot | 6 | ||||
-rw-r--r-- | csit/resources/tests/data/onap.policies.apex.pnf.Test.json | 38 | ||||
-rw-r--r-- | csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json | 38 | ||||
-rw-r--r-- | csit/resources/tests/distribution-test.robot | 8 | ||||
-rw-r--r-- | csit/resources/tests/drools-applications-test.robot | 29 | ||||
-rwxr-xr-x | csit/resources/tests/kafka_consumer.py | 66 | ||||
-rwxr-xr-x | csit/resources/tests/kafka_producer.py | 41 | ||||
-rwxr-xr-x | csit/resources/tests/make_topics.py | 41 | ||||
-rw-r--r-- | csit/resources/tests/pap-test.robot | 1 | ||||
-rw-r--r-- | csit/resources/tests/policy-clamp-test.robot | 1 | ||||
-rw-r--r-- | csit/resources/tests/xacml-pdp-test.robot | 11 |
14 files changed, 262 insertions, 82 deletions
diff --git a/csit/resources/tests/apex-pdp-common.robot b/csit/resources/tests/apex-pdp-common.robot index 8ae63af6..e6458318 100644 --- a/csit/resources/tests/apex-pdp-common.robot +++ b/csit/resources/tests/apex-pdp-common.robot @@ -27,8 +27,8 @@ RunEventOnApexEngine CheckLogMessage [Documentation] Read log messages received and check for expected content. - [Arguments] ${status} ${expectedMsg} - ${result}= CheckTopic APEX-CL-MGT ${status} + [Arguments] ${topic} ${status} ${expectedMsg} + ${result}= CheckKafkaTopic ${topic} ${status} Should Contain ${result} ${expectedMsg} ValidatePolicyExecution diff --git a/csit/resources/tests/apex-pdp-test.robot b/csit/resources/tests/apex-pdp-test.robot index b0232263..5e4ea34b 100644 --- a/csit/resources/tests/apex-pdp-test.robot +++ b/csit/resources/tests/apex-pdp-test.robot @@ -31,17 +31,17 @@ ExecuteApexTestPnfPolicy CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy + GetKafkaTopic apex-cl-mgt + Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy apex-cl-mgt -ExecuteApexTestVnfPolicy - Set Test Variable ${policyName} onap.policies.apex.vnf.Test - ${postjson}= Get File ${CURDIR}/data/${policyName}.json - CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 - DeployPolicy - Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestVnfPolicy +#ExecuteApexTestVnfPolicy +# Set Test Variable ${policyName} onap.policies.apex.vnf.Test +# ${postjson}= Get File ${CURDIR}/data/${policyName}.json +# CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 +# DeployPolicy +# Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex +# GetTopic apex-cl-mgt +# Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestVnfPolicy ExecuteApexTestPnfPolicyWithMetadataSet Set Test Variable ${policyName} onap.policies.apex.pnf.metadataSet.Test @@ -51,17 +51,17 @@ ExecuteApexTestPnfPolicyWithMetadataSet CreateNodeTemplate /policy/api/v1/nodetemplates 200 ${postjson} 1 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT2 - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy + GetKafkaTopic apex-cl-mgt2 + Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy apex-cl-mgt2 Metrics [Documentation] Verify policy-apex-pdp is exporting prometheus metrics ${auth}= PolicyAdminAuth ${resp}= PerformGetRequest ${APEX_IP} /metrics 200 null ${auth} - Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 4.0 - Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 4.0 - Should Contain ${resp.text} pdpa_policy_executions_total{status="SUCCESS",} 3.0 - Should Contain ${resp.text} pdpa_policy_executions_total{status="TOTAL",} 3.0 + Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 3.0 + Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 3.0 + Should Contain ${resp.text} pdpa_policy_executions_total{status="SUCCESS",} 6.0 + Should Contain ${resp.text} pdpa_policy_executions_total{status="TOTAL",} 6.0 Should Match ${resp.text} *pdpa_engine_event_executions{engine_instance_id="NSOApexEngine-*:0.0.1",}* Should Match ${resp.text} *pdpa_engine_event_executions{engine_instance_id="MyApexEngine-*:0.0.1",}* Should Match ${resp.text} *pdpa_engine_state{engine_instance_id=*,} 2.0* @@ -78,18 +78,16 @@ Metrics TriggerAndVerifyTestPnfPolicy [Documentation] Send TestPnf policy trigger event to DMaaP and read notifications to verify policy execution - Create Session apexSession http://${DMAAP_IP} max_retries=1 + [Arguments] ${topic} ${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_CL_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 - Run Keyword CheckLogMessage ACTIVE VES event has been received. Going to fetch details from AAI. - Run Keyword CheckLogMessage SUCCESS Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS. - Run Keyword CheckLogMessage FINAL_SUCCESS Successfully processed the VES event. Hostname is updated. + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} + Run Keyword CheckLogMessage ${topic} ACTIVE VES event has been received. Going to fetch details from AAI. + Run Keyword CheckLogMessage ${topic} SUCCESS Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS. + Run Keyword CheckLogMessage ${topic} FINAL_SUCCESS Successfully processed the VES event. Hostname is updated. TriggerAndVerifyTestVnfPolicy [Documentation] Send TestVnf policy trigger event to DMaaP and read notifications to verify policy execution - Create Session apexSession http://${DMAAP_IP} max_retries=1 + Create Session apexSession http://${KAFKA_IP} max_retries=1 ${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT data=${data} headers=${headers} diff --git a/csit/resources/tests/apex-slas.robot b/csit/resources/tests/apex-slas.robot index 408b0add..4191bb2a 100644 --- a/csit/resources/tests/apex-slas.robot +++ b/csit/resources/tests/apex-slas.robot @@ -23,13 +23,10 @@ ValidatePolicyExecutionAndEventRateLowComplexity CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Create Session apexSession http://${DMAAP_IP} max_retries=1 + GetKafkaTopic apex-cl-mgt ${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${eventStartTime}= Get Current Date - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_CL_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${eventEndTime}= Get Current Date ValidateEventExecution ${eventStartTime} ${eventEndTime} 10 @@ -53,13 +50,10 @@ ValidatePolicyExecutionAndEventRateHighComplexity CreateNodeTemplate /policy/api/v1/nodetemplates 200 ${postjson} 1 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT2 - Create Session apexSession http://${DMAAP_IP} max_retries=1 + GetKafkaTopic apex-cl-mgt2 ${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${eventStartTime}= Get Current Date - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${eventEndTime}= Get Current Date ValidateEventExecution ${eventStartTime} ${eventEndTime} 0.2 diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot index f5db8e0e..531572d2 100644 --- a/csit/resources/tests/common-library.robot +++ b/csit/resources/tests/common-library.robot @@ -150,9 +150,9 @@ CheckTopic CheckKafkaTopic [Arguments] ${topic} ${expected_status} - ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status} - Log to console Received response from kafka ${resp.stdout} - Should Contain ${resp.text} ${expected_status} + ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 60 ${expected_status} + Should Contain ${resp.stdout} ${expected_status} + [Return] ${resp.stdout} GetKafkaTopic [Arguments] ${topic} diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json index 05522936..fc9b521c 100644 --- a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json +++ b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json @@ -32,7 +32,7 @@ "taskParameters": [ { "key": "logUrl", - "value": "http://message-router:3904/events/APEX-CL-MGT" + "value": "http://localhost:8082/topics/apex-cl-mgt" } ] }, @@ -2775,10 +2775,23 @@ "eventInputParameters": { "SimpleCL_DCAEConsumer": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000" + "bootstrapServers": "kafka:9092", + "groupId": "apex-grp", + "enableAutoCommit": true, + "autoCommitTime": 1000, + "sessionTimeout": 30000, + "consumerPollTime": 100, + "consumerTopicList": [ + "unauthenticated.dcae_cl_output" + ], + "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { @@ -2876,10 +2889,21 @@ }, "SimpleCL_logOutputter": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/APEX-CL-MGT" + "bootstrapServers": "kafka:9092", + "acks": "all", + "retries": 0, + "batchSize": 16384, + "lingerTime": 1, + "bufferMemory": 33554432, + "producerTopic": "apex-cl-mgt", + "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", + "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json index 20338fff..d9cd87e2 100644 --- a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json +++ b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json @@ -36,7 +36,7 @@ "taskParameters": [ { "key": "logUrl", - "value": "http://message-router:3904/events/APEX-CL-MGT2" + "value": "http://kafka:9092/topics/apex-cl-mgt2" } ] } @@ -44,10 +44,23 @@ "eventInputParameters": { "SimpleCL_DCAEConsumer2": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000" + "bootstrapServers": "kafka:9092", + "groupId": "apex-grp2", + "enableAutoCommit": true, + "autoCommitTime": 1000, + "sessionTimeout": 30000, + "consumerPollTime": 100, + "consumerTopicList": [ + "unauthenticated.dcae_cl_output" + ], + "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { @@ -145,10 +158,21 @@ }, "SimpleCL_logOutputter2": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/APEX-CL-MGT2" + "bootstrapServers": "kafka:9092", + "acks": "all", + "retries": 0, + "batchSize": 16384, + "lingerTime": 1, + "bufferMemory": 33554432, + "producerTopic": "apex-cl-mgt2", + "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", + "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { diff --git a/csit/resources/tests/distribution-test.robot b/csit/resources/tests/distribution-test.robot index b8b49248..c85be4fd 100644 --- a/csit/resources/tests/distribution-test.robot +++ b/csit/resources/tests/distribution-test.robot @@ -20,11 +20,11 @@ MetricsAfterExecution [Documentation] Verify policy-distribution is exporting prometheus metrics after execution ${hcauth}= PolicyAdminAuth ${resp}= PerformGetRequest ${DISTRIBUTION_IP} /metrics 200 null ${hcauth} - Should Contain ${resp.text} total_distribution_received_count_total 1.0 - Should Contain ${resp.text} distribution_success_count_total 1.0 + Should Contain ${resp.text} total_distribution_received_count_total 2.0 + Should Contain ${resp.text} distribution_success_count_total 2.0 Should Contain ${resp.text} distribution_failure_count_total 0.0 - Should Contain ${resp.text} total_download_received_count_total 1.0 - Should Contain ${resp.text} download_success_count_total 1.0 + Should Contain ${resp.text} total_download_received_count_total 2.0 + Should Contain ${resp.text} download_success_count_total 2.0 Should Contain ${resp.text} download_failure_count_total 0.0 *** Keywords *** diff --git a/csit/resources/tests/drools-applications-test.robot b/csit/resources/tests/drools-applications-test.robot index 81ec65e0..5c6a1b5c 100644 --- a/csit/resources/tests/drools-applications-test.robot +++ b/csit/resources/tests/drools-applications-test.robot @@ -29,8 +29,8 @@ Healthcheck MakeTopics [Documentation] Creates the Policy topics - GetTopic POLICY-PDP-PAP - GetTopic POLICY-CL-MGT + GetKafkaTopic policy-pdp-pap + GetKafkaTopic policy-cl-mgt CreateVcpeXacmlPolicy [Documentation] Create VCPE Policy for Xacml @@ -59,27 +59,20 @@ CreateVfwDroolsPolicy DeployXacmlPolicies [Documentation] Deploys the Policies to Xacml PerformPostRequest /policy/pap/v1/pdps/deployments/batch null ${POLICY_PAP_IP} deploy.xacml.policies.json ${CURDIR}/data json 202 - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} xacml - Should Contain ${result} restart - Should Contain ${result} onap.restart.tca + Sleep 5s + ${result}= CheckKafkaTopic policy-notification onap.vfirewall.tca + Should Contain ${result} deployed-policies Should Contain ${result} onap.scaleout.tca - Should Contain ${result} onap.vfirewall.tca + Should Contain ${result} onap.restart.tca DeployDroolsPolicies [Documentation] Deploys the Policies to Drools PerformPostRequest /policy/pap/v1/pdps/deployments/batch null ${POLICY_PAP_IP} deploy.drools.policies.json ${CURDIR}/data json 202 - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} drools - Should Contain ${result} operational.restart + Sleep 5s + ${result}= CheckKafkaTopic policy-notification operational.modifyconfig + Should Contain ${result} deployed-policies Should Contain ${result} operational.scaleout - Should Contain ${result} operational.modifyconfig + Should Contain ${result} operational.restart #VcpeExecute # [Documentation] Executes VCPE Policy @@ -179,7 +172,7 @@ OnSet ${data}= Get File ${file} Create Session session http://${DMAAP_IP} max_retries=1 ${headers}= Create Dictionary Content-Type=application/json - ${resp}= POST On Session session /events/unauthenticated.DCAE_CL_OUTPUT headers=${headers} data=${data} + ${resp}= POST On Session session /events/unauthenticateddcae_cl_output headers=${headers} data=${data} Log Response from dmaap ${resp.text} Status Should Be OK [Return] ${resp.text} diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py new file mode 100755 index 00000000..595e3db7 --- /dev/null +++ b/csit/resources/tests/kafka_consumer.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to fetch kafka topic and look for required messages. +# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. + + +from confluent_kafka import Consumer, KafkaException +import sys +import time + + +def consume_kafka_topic(topic, expected_values, timeout): + config = { + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' + } + consumer = Consumer(config) + consumer.subscribe([topic]) + try: + start_time = time.time() + while time.time() - start_time < timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") + print('ERROR') + sys.exit(404) + else: + # Error + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if expected_values in message: + print(message) + sys.exit(200) + finally: + consumer.close() + + +if __name__ == '__main__': + topic_name = sys.argv[1] + timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic + expected_values = sys.argv[3] + consume_kafka_topic(topic_name, expected_values, timeout) diff --git a/csit/resources/tests/kafka_producer.py b/csit/resources/tests/kafka_producer.py new file mode 100755 index 00000000..e6f01c21 --- /dev/null +++ b/csit/resources/tests/kafka_producer.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to produce a message on a kafka topic +# Accepts the arguments {topic_name} and {message} + +from confluent_kafka import Producer +import sys + +def post_to_kafka(topic, message): + conf = {'bootstrap.servers': 'localhost:29092'} + + producer = Producer(conf) + try: + producer.produce(topic, value=message.encode('utf-8')) + producer.flush() + print('Message posted to Kafka topic: {}'.format(topic)) + except Exception as e: + print('Failed to post message: {}'.format(str(e))) + finally: + producer.flush() + +if __name__ == '__main__': + post_to_kafka(sys.argv[1], sys.argv[2]) diff --git a/csit/resources/tests/make_topics.py b/csit/resources/tests/make_topics.py new file mode 100755 index 00000000..64a230eb --- /dev/null +++ b/csit/resources/tests/make_topics.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to create a new kafka topic +# Accepts the argument {topic_name} + +from confluent_kafka.admin import AdminClient, NewTopic +import sys + +def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2): + admin_client = AdminClient({'bootstrap.servers': bootstrap_servers}) + + # Define the topic configuration + topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) + + # Create the topic + admin_client.create_topics([topic]) + + +if __name__ == '__main__': + topic_name = sys.argv[1] + bootstrap_servers = 'localhost:29092' + + create_topic(bootstrap_servers, topic_name) diff --git a/csit/resources/tests/pap-test.robot b/csit/resources/tests/pap-test.robot index 82328437..74d299a5 100644 --- a/csit/resources/tests/pap-test.robot +++ b/csit/resources/tests/pap-test.robot @@ -47,6 +47,7 @@ Healthcheck Consolidated Healthcheck [Documentation] Verify policy consolidated health check + sleep 20 ${resp}= GetReq /policy/pap/v1/components/healthcheck Should Be Equal As Strings ${resp.json()['healthy']} True diff --git a/csit/resources/tests/policy-clamp-test.robot b/csit/resources/tests/policy-clamp-test.robot index d5fec492..10f93893 100644 --- a/csit/resources/tests/policy-clamp-test.robot +++ b/csit/resources/tests/policy-clamp-test.robot @@ -97,6 +97,7 @@ QueryPolicies QueryPolicyTypes [Documentation] Verify the new policy types created ${auth}= Create List policyadmin zb!XztG34 + sleep 10 Log Creating session http://${POLICY_API_IP}}:6969 ${session}= Create Session policy http://${POLICY_API_IP} auth=${auth} ${headers}= Create Dictionary Accept=application/json Content-Type=application/json diff --git a/csit/resources/tests/xacml-pdp-test.robot b/csit/resources/tests/xacml-pdp-test.robot index 331a7fa5..e500a04c 100644 --- a/csit/resources/tests/xacml-pdp-test.robot +++ b/csit/resources/tests/xacml-pdp-test.robot @@ -19,7 +19,7 @@ Metrics MakeTopics [Documentation] Creates the Policy topics - GetTopic POLICY-PDP-PAP + GetKafkaTopic policy-pdp-pap ExecuteXacmlPolicy CreateMonitorPolicy @@ -56,12 +56,9 @@ DeployPolicies ${postjson}= Get file ${CURDIR}/data/vCPE.policy.input.tosca.deploy.json ${policyadmin}= PolicyAdminAuth PerformPostRequest ${POLICY_PAP_IP} /policy/pap/v1/pdps/policies 202 ${postjson} null ${policyadmin} - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} xacml - Should Contain ${result} onap.restart.tca + sleep 20s + ${result}= CheckKafkaTopic policy-notification onap.restart.tca + Should Contain ${result} deployed-policies GetAbbreviatedDecisionResult [Documentation] Get Decision with abbreviated results from Policy Xacml PDP |