From f2e4da7e296548fb3980fd212e3a67dc83254e1d Mon Sep 17 00:00:00 2001 From: rameshiyer27 Date: Sat, 13 Jan 2024 21:26:09 +0000 Subject: Add kafka support in Policy CSIT Issue-ID: POLICY-4402 Signed-off-by: zrrmmua Change-Id: I802c19a3c9817d304164eba634adb8c119aa4ced --- csit/resources/scripts/kafka_consumer.py | 71 ---------------------- csit/resources/scripts/kafka_producer.py | 41 ------------- csit/resources/scripts/make_topics.py | 41 ------------- csit/resources/scripts/run-test.sh | 5 +- csit/resources/scripts/setup-apex-pdp-postgres.sh | 4 +- csit/resources/scripts/setup-apex-pdp.sh | 6 +- .../resources/scripts/setup-drools-applications.sh | 4 +- csit/resources/scripts/setup-xacml-pdp.sh | 4 +- csit/resources/tests/apex-pdp-common.robot | 4 +- csit/resources/tests/apex-pdp-test.robot | 46 +++++++------- csit/resources/tests/apex-slas.robot | 14 ++--- csit/resources/tests/common-library.robot | 6 +- .../tests/data/onap.policies.apex.pnf.Test.json | 38 +++++++++--- .../onap.policies.apex.pnf.metadataSet.Test.json | 38 +++++++++--- csit/resources/tests/distribution-test.robot | 8 +-- .../resources/tests/drools-applications-test.robot | 29 ++++----- csit/resources/tests/kafka_consumer.py | 66 ++++++++++++++++++++ csit/resources/tests/kafka_producer.py | 41 +++++++++++++ csit/resources/tests/make_topics.py | 41 +++++++++++++ csit/resources/tests/pap-test.robot | 1 + csit/resources/tests/policy-clamp-test.robot | 1 + csit/resources/tests/xacml-pdp-test.robot | 11 ++-- csit/run-k8s-csit.sh | 4 +- 23 files changed, 276 insertions(+), 248 deletions(-) delete mode 100755 csit/resources/scripts/kafka_consumer.py delete mode 100755 csit/resources/scripts/kafka_producer.py delete mode 100755 csit/resources/scripts/make_topics.py create mode 100755 csit/resources/tests/kafka_consumer.py create mode 100755 csit/resources/tests/kafka_producer.py create mode 100755 csit/resources/tests/make_topics.py (limited to 'csit') diff --git a/csit/resources/scripts/kafka_consumer.py b/csit/resources/scripts/kafka_consumer.py deleted file mode 100755 index 80b6167a..00000000 --- a/csit/resources/scripts/kafka_consumer.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# -# ============LICENSE_START==================================================== -# Copyright (C) 2023 Nordix Foundation. -# ============================================================================= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END====================================================== - -# Python utility to fetch kafka topic and look for required messages. -# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. - - -from confluent_kafka import Consumer, KafkaException -import sys -import time - -def consume_kafka_topic(topic, expected_values, timeout): - config = { - 'bootstrap.servers': 'localhost:29092', - 'group.id': 'testgrp', - 'auto.offset.reset': 'earliest' - } - consumer = Consumer(config) - consumer.subscribe([topic]) - try: - start_time = time.time() - while time.time() - start_time < timeout: - msg = consumer.poll(1.0) - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaException._PARTITION_EOF: - sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") - print('ERROR') - sys.exit(404) - else: - # Error - raise KafkaException(msg.error()) - else: - # Message received - message = msg.value().decode('utf-8') - if verify_msg(expected_values, message): - print(message) - sys.exit(200) - finally: - consumer.close() - -def verify_msg(expected_values, message): - for item in expected_values: - if item not in message: - return False - return True - - -if __name__ == '__main__': - topic_name = sys.argv[1] - timeout = sys.argv[2] # timeout in seconds for verifying the kafka topic - expected_values = sys.argv[3:] - consume_kafka_topic(topic_name, expected_values, timeout) diff --git a/csit/resources/scripts/kafka_producer.py b/csit/resources/scripts/kafka_producer.py deleted file mode 100755 index ff129872..00000000 --- a/csit/resources/scripts/kafka_producer.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -# -# ============LICENSE_START==================================================== -# Copyright (C) 2023 Nordix Foundation. -# ============================================================================= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END====================================================== - -# Python utility to produce a message on a kafka topic -# Accepts the arguments {topic_name} and {message} - -from confluent_kafka import Producer -import sys - -def post_to_kafka(topic, message): - conf = {'bootstrap.servers': 'localhost:29092'} - - producer = Producer(conf) - try: - producer.produce(topic, value=message.encode('utf-8')) - producer.flush() - print('Message posted to Kafka topic: {}'.format(topic)) - except Exception as e: - print('Failed to post message: {}'.format(str(e))) - finally: - producer.flush() - -if __name__ == '__main__': - post_to_kafka(sys.argv[1], sys.argv[2]) diff --git a/csit/resources/scripts/make_topics.py b/csit/resources/scripts/make_topics.py deleted file mode 100755 index daee4341..00000000 --- a/csit/resources/scripts/make_topics.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python3 -# -# ============LICENSE_START==================================================== -# Copyright (C) 2023 Nordix Foundation. -# ============================================================================= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# ============LICENSE_END====================================================== - -# Python utility to create a new kafka topic -# Accepts the argument {topic_name} - -from confluent_kafka.admin import AdminClient, NewTopic -import sys - -def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2): - admin_client = AdminClient({'bootstrap.servers': bootstrap_servers}) - - # Define the topic configuration - topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) - - # Create the topic - admin_client.create_topics([topic]) - - -if __name__ == '__main__': - topic_name = sys.argv[1] - bootstrap_servers = 'localhost:29092' - - create_topic(bootstrap_servers, topic_name) diff --git a/csit/resources/scripts/run-test.sh b/csit/resources/scripts/run-test.sh index 02f06ff4..6bd7c07b 100755 --- a/csit/resources/scripts/run-test.sh +++ b/csit/resources/scripts/run-test.sh @@ -1,7 +1,7 @@ #!/bin/bash # # ============LICENSE_START==================================================== -# Copyright (C) 2023 Nordix Foundation. +# Copyright (C) 2023-2024 Nordix Foundation. # ============================================================================= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ POLICY_PDPX_IP=policy-xacml-pdp:${DEFAULT_PORT} POLICY_DROOLS_IP=policy-drools-pdp:9696 DISTRIBUTION_IP=policy-distribution:6969 DMAAP_IP=message-router:3904 +KAFKA_IP=kafka:9092 APEX_EVENTS_IP=policy-apex-pdp:23324 PROMETHEUS_IP=prometheus:9090 CLAMP_K8S_TEST=true @@ -43,7 +44,7 @@ DIST_TEMP_FOLDER=/tmp/distribution export ROBOT_VARIABLES= ROBOT_VARIABLES="-v DATA:$DATA -v NODETEMPLATES:$NODETEMPLATES -v POLICY_API_IP:$POLICY_API_IP -v POLICY_RUNTIME_ACM_IP:$POLICY_RUNTIME_ACM_IP -v POLICY_PAP_IP:$POLICY_PAP_IP -v APEX_IP:$APEX_IP --v APEX_EVENTS_IP:$APEX_EVENTS_IP -v DMAAP_IP:$DMAAP_IP -v PROMETHEUS_IP:${PROMETHEUS_IP} +-v APEX_EVENTS_IP:$APEX_EVENTS_IP -v DMAAP_IP:$DMAAP_IP -v KAFKA_IP:$KAFKA_IP -v PROMETHEUS_IP:${PROMETHEUS_IP} -v POLICY_PDPX_IP:$POLICY_PDPX_IP -v POLICY_DROOLS_IP:$POLICY_DROOLS_IP -v TEMP_FOLDER:${DIST_TEMP_FOLDER} -v DISTRIBUTION_IP:$DISTRIBUTION_IP -v CLAMP_K8S_TEST:$CLAMP_K8S_TEST" diff --git a/csit/resources/scripts/setup-apex-pdp-postgres.sh b/csit/resources/scripts/setup-apex-pdp-postgres.sh index f088da3c..150ec8f6 100755 --- a/csit/resources/scripts/setup-apex-pdp-postgres.sh +++ b/csit/resources/scripts/setup-apex-pdp-postgres.sh @@ -44,10 +44,10 @@ do sleep 10s done -export DMAAP_IP="localhost:${DMAAP_PORT}" +export KAFKA_IP="localhost:${KAFKA_PORT}" export SUITES="apex-pdp-test.robot" ROBOT_VARIABLES="-v POLICY_PAP_IP:localhost:${PAP_PORT} -v POLICY_API_IP:localhost:${API_PORT} -v PROMETHEUS_IP:localhost:${PROMETHEUS_PORT} -v DATA:${DATA} -v NODETEMPLATES:${NODETEMPLATES} --v APEX_IP:localhost:${APEX_PORT} -v DMAAP_IP:${DMAAP_IP} +-v APEX_IP:localhost:${APEX_PORT} -v KAFKA_IP:${KAFKA_IP} -v APEX_EVENTS_IP:localhost:${APEX_EVENTS_PORT}" diff --git a/csit/resources/scripts/setup-apex-pdp.sh b/csit/resources/scripts/setup-apex-pdp.sh index b9b1a78a..198a6017a 100755 --- a/csit/resources/scripts/setup-apex-pdp.sh +++ b/csit/resources/scripts/setup-apex-pdp.sh @@ -2,7 +2,7 @@ # ============LICENSE_START======================================================= # Copyright (C) 2018 Ericsson. All rights reserved. # -# Modifications Copyright (c) 2019-2023 Nordix Foundation. +# Modifications Copyright (c) 2019-2024 Nordix Foundation. # Modifications Copyright (C) 2020-2021 AT&T Intellectual Property. # Modifications Copyright (C) 2021 Bell Canada. All rights reserved. # ================================================================================ @@ -26,9 +26,9 @@ source "${SCRIPTS}"/setup-pap.sh # wait for the app to start up bash "${SCRIPTS}"/wait_for_rest.sh localhost ${APEX_PORT} -export DMAAP_IP="localhost:${DMAAP_PORT}" +export KAFKA_IP="kafka:${KAFKA_PORT}" export SUITES="apex-pdp-test.robot apex-slas.robot" -ROBOT_VARIABLES="${ROBOT_VARIABLES} -v APEX_IP:localhost:${APEX_PORT} -v DMAAP_IP:${DMAAP_IP} +ROBOT_VARIABLES="${ROBOT_VARIABLES} -v APEX_IP:localhost:${APEX_PORT} -v KAFKA_IP:${KAFKA_IP} -v APEX_EVENTS_IP:localhost:${APEX_EVENTS_PORT}" diff --git a/csit/resources/scripts/setup-drools-applications.sh b/csit/resources/scripts/setup-drools-applications.sh index d8542bd1..369874b6 100755 --- a/csit/resources/scripts/setup-drools-applications.sh +++ b/csit/resources/scripts/setup-drools-applications.sh @@ -2,7 +2,7 @@ # # ===========LICENSE_START==================================================== # Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved. -# Modifications Copyright 2021-2023 Nordix Foundation. +# Modifications Copyright 2021-2024 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,4 +35,4 @@ sleep 15 ROBOT_VARIABLES="-v DATA:${DATA} -v DROOLS_IP:localhost:${DROOLS_APPS_PORT} -v DROOLS_IP_2:localhost:${DROOLS_APPS_TELEMETRY_PORT} -v POLICY_API_IP:localhost:${API_PORT} --v POLICY_PAP_IP:localhost:${PAP_PORT} -v DMAAP_IP:localhost:${DMAAP_PORT}" +-v POLICY_PAP_IP:localhost:${PAP_PORT} -v KAFKA_IP:localhost:${KAFKA_PORT}" diff --git a/csit/resources/scripts/setup-xacml-pdp.sh b/csit/resources/scripts/setup-xacml-pdp.sh index 251cb29c..4511d91e 100755 --- a/csit/resources/scripts/setup-xacml-pdp.sh +++ b/csit/resources/scripts/setup-xacml-pdp.sh @@ -1,7 +1,7 @@ #!/bin/bash # ============LICENSE_START======================================================= # Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved. -# Modifications Copyright 2021-2023 Nordix Foundation. +# Modifications Copyright 2021-2024 Nordix Foundation. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,4 +32,4 @@ export SUITES="xacml-pdp-test.robot" ROBOT_VARIABLES="-v DATA:${DATA} -v POLICY_PDPX_IP:localhost:${XACML_PORT} -v POLICY_API_IP:localhost:${API_PORT} -v POLICY_PAP_IP:localhost:${PAP_PORT} --v DMAAP_IP:localhost:${DMAAP_PORT}" +-v KAFKA_IP:localhost:${KAFKA_PORT}" diff --git a/csit/resources/tests/apex-pdp-common.robot b/csit/resources/tests/apex-pdp-common.robot index 8ae63af6..e6458318 100644 --- a/csit/resources/tests/apex-pdp-common.robot +++ b/csit/resources/tests/apex-pdp-common.robot @@ -27,8 +27,8 @@ RunEventOnApexEngine CheckLogMessage [Documentation] Read log messages received and check for expected content. - [Arguments] ${status} ${expectedMsg} - ${result}= CheckTopic APEX-CL-MGT ${status} + [Arguments] ${topic} ${status} ${expectedMsg} + ${result}= CheckKafkaTopic ${topic} ${status} Should Contain ${result} ${expectedMsg} ValidatePolicyExecution diff --git a/csit/resources/tests/apex-pdp-test.robot b/csit/resources/tests/apex-pdp-test.robot index b0232263..5e4ea34b 100644 --- a/csit/resources/tests/apex-pdp-test.robot +++ b/csit/resources/tests/apex-pdp-test.robot @@ -31,17 +31,17 @@ ExecuteApexTestPnfPolicy CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy + GetKafkaTopic apex-cl-mgt + Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy apex-cl-mgt -ExecuteApexTestVnfPolicy - Set Test Variable ${policyName} onap.policies.apex.vnf.Test - ${postjson}= Get File ${CURDIR}/data/${policyName}.json - CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 - DeployPolicy - Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestVnfPolicy +#ExecuteApexTestVnfPolicy +# Set Test Variable ${policyName} onap.policies.apex.vnf.Test +# ${postjson}= Get File ${CURDIR}/data/${policyName}.json +# CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 +# DeployPolicy +# Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex +# GetTopic apex-cl-mgt +# Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestVnfPolicy ExecuteApexTestPnfPolicyWithMetadataSet Set Test Variable ${policyName} onap.policies.apex.pnf.metadataSet.Test @@ -51,17 +51,17 @@ ExecuteApexTestPnfPolicyWithMetadataSet CreateNodeTemplate /policy/api/v1/nodetemplates 200 ${postjson} 1 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT2 - Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy + GetKafkaTopic apex-cl-mgt2 + Wait Until Keyword Succeeds 2 min 5 sec TriggerAndVerifyTestPnfPolicy apex-cl-mgt2 Metrics [Documentation] Verify policy-apex-pdp is exporting prometheus metrics ${auth}= PolicyAdminAuth ${resp}= PerformGetRequest ${APEX_IP} /metrics 200 null ${auth} - Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 4.0 - Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 4.0 - Should Contain ${resp.text} pdpa_policy_executions_total{status="SUCCESS",} 3.0 - Should Contain ${resp.text} pdpa_policy_executions_total{status="TOTAL",} 3.0 + Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 3.0 + Should Contain ${resp.text} pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 3.0 + Should Contain ${resp.text} pdpa_policy_executions_total{status="SUCCESS",} 6.0 + Should Contain ${resp.text} pdpa_policy_executions_total{status="TOTAL",} 6.0 Should Match ${resp.text} *pdpa_engine_event_executions{engine_instance_id="NSOApexEngine-*:0.0.1",}* Should Match ${resp.text} *pdpa_engine_event_executions{engine_instance_id="MyApexEngine-*:0.0.1",}* Should Match ${resp.text} *pdpa_engine_state{engine_instance_id=*,} 2.0* @@ -78,18 +78,16 @@ Metrics TriggerAndVerifyTestPnfPolicy [Documentation] Send TestPnf policy trigger event to DMaaP and read notifications to verify policy execution - Create Session apexSession http://${DMAAP_IP} max_retries=1 + [Arguments] ${topic} ${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_CL_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 - Run Keyword CheckLogMessage ACTIVE VES event has been received. Going to fetch details from AAI. - Run Keyword CheckLogMessage SUCCESS Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS. - Run Keyword CheckLogMessage FINAL_SUCCESS Successfully processed the VES event. Hostname is updated. + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} + Run Keyword CheckLogMessage ${topic} ACTIVE VES event has been received. Going to fetch details from AAI. + Run Keyword CheckLogMessage ${topic} SUCCESS Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS. + Run Keyword CheckLogMessage ${topic} FINAL_SUCCESS Successfully processed the VES event. Hostname is updated. TriggerAndVerifyTestVnfPolicy [Documentation] Send TestVnf policy trigger event to DMaaP and read notifications to verify policy execution - Create Session apexSession http://${DMAAP_IP} max_retries=1 + Create Session apexSession http://${KAFKA_IP} max_retries=1 ${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT data=${data} headers=${headers} diff --git a/csit/resources/tests/apex-slas.robot b/csit/resources/tests/apex-slas.robot index 408b0add..4191bb2a 100644 --- a/csit/resources/tests/apex-slas.robot +++ b/csit/resources/tests/apex-slas.robot @@ -23,13 +23,10 @@ ValidatePolicyExecutionAndEventRateLowComplexity CreatePolicy /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies 200 ${postjson} ${policyName} 1.0.0 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT - Create Session apexSession http://${DMAAP_IP} max_retries=1 + GetKafkaTopic apex-cl-mgt ${data}= Get Binary File ${CURDIR}/data/VesEventForPnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${eventStartTime}= Get Current Date - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_CL_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_cl_output ${data} ${eventEndTime}= Get Current Date ValidateEventExecution ${eventStartTime} ${eventEndTime} 10 @@ -53,13 +50,10 @@ ValidatePolicyExecutionAndEventRateHighComplexity CreateNodeTemplate /policy/api/v1/nodetemplates 200 ${postjson} 1 DeployPolicy Wait Until Keyword Succeeds 2 min 5 sec QueryPolicyStatus ${policyName} defaultGroup apex ${pdpName} onap.policies.native.Apex - GetTopic APEX-CL-MGT2 - Create Session apexSession http://${DMAAP_IP} max_retries=1 + GetKafkaTopic apex-cl-mgt2 ${data}= Get Binary File ${CURDIR}/data/VesEventForVnfPolicy.json - &{headers}= Create Dictionary Content-Type=application/json Accept=application/json ${eventStartTime}= Get Current Date - ${resp}= POST On Session apexSession /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT data=${data} headers=${headers} - Should Be Equal As Strings ${resp.status_code} 200 + ${resp}= Run Process ${CURDIR}/kafka_producer.py unauthenticated.dcae_policy_example_output ${data} ${eventEndTime}= Get Current Date ValidateEventExecution ${eventStartTime} ${eventEndTime} 0.2 diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot index f5db8e0e..531572d2 100644 --- a/csit/resources/tests/common-library.robot +++ b/csit/resources/tests/common-library.robot @@ -150,9 +150,9 @@ CheckTopic CheckKafkaTopic [Arguments] ${topic} ${expected_status} - ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 30 ${expected_status} - Log to console Received response from kafka ${resp.stdout} - Should Contain ${resp.text} ${expected_status} + ${resp}= Run Process ${CURDIR}/kafka_consumer.py ${topic} 60 ${expected_status} + Should Contain ${resp.stdout} ${expected_status} + [Return] ${resp.stdout} GetKafkaTopic [Arguments] ${topic} diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json index 05522936..fc9b521c 100644 --- a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json +++ b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json @@ -32,7 +32,7 @@ "taskParameters": [ { "key": "logUrl", - "value": "http://message-router:3904/events/APEX-CL-MGT" + "value": "http://localhost:8082/topics/apex-cl-mgt" } ] }, @@ -2775,10 +2775,23 @@ "eventInputParameters": { "SimpleCL_DCAEConsumer": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000" + "bootstrapServers": "kafka:9092", + "groupId": "apex-grp", + "enableAutoCommit": true, + "autoCommitTime": 1000, + "sessionTimeout": 30000, + "consumerPollTime": 100, + "consumerTopicList": [ + "unauthenticated.dcae_cl_output" + ], + "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { @@ -2876,10 +2889,21 @@ }, "SimpleCL_logOutputter": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/APEX-CL-MGT" + "bootstrapServers": "kafka:9092", + "acks": "all", + "retries": 0, + "batchSize": 16384, + "lingerTime": 1, + "bufferMemory": 33554432, + "producerTopic": "apex-cl-mgt", + "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", + "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json index 20338fff..d9cd87e2 100644 --- a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json +++ b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json @@ -36,7 +36,7 @@ "taskParameters": [ { "key": "logUrl", - "value": "http://message-router:3904/events/APEX-CL-MGT2" + "value": "http://kafka:9092/topics/apex-cl-mgt2" } ] } @@ -44,10 +44,23 @@ "eventInputParameters": { "SimpleCL_DCAEConsumer2": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000" + "bootstrapServers": "kafka:9092", + "groupId": "apex-grp2", + "enableAutoCommit": true, + "autoCommitTime": 1000, + "sessionTimeout": 30000, + "consumerPollTime": 100, + "consumerTopicList": [ + "unauthenticated.dcae_cl_output" + ], + "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { @@ -145,10 +158,21 @@ }, "SimpleCL_logOutputter2": { "carrierTechnologyParameters": { - "carrierTechnology": "RESTCLIENT", - "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters", + "carrierTechnology": "KAFKA", + "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters", "parameters": { - "url": "http://message-router:3904/events/APEX-CL-MGT2" + "bootstrapServers": "kafka:9092", + "acks": "all", + "retries": 0, + "batchSize": 16384, + "lingerTime": 1, + "bufferMemory": 33554432, + "producerTopic": "apex-cl-mgt2", + "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", + "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", + "kafkaProperties": [ + + ] } }, "eventProtocolParameters": { diff --git a/csit/resources/tests/distribution-test.robot b/csit/resources/tests/distribution-test.robot index b8b49248..c85be4fd 100644 --- a/csit/resources/tests/distribution-test.robot +++ b/csit/resources/tests/distribution-test.robot @@ -20,11 +20,11 @@ MetricsAfterExecution [Documentation] Verify policy-distribution is exporting prometheus metrics after execution ${hcauth}= PolicyAdminAuth ${resp}= PerformGetRequest ${DISTRIBUTION_IP} /metrics 200 null ${hcauth} - Should Contain ${resp.text} total_distribution_received_count_total 1.0 - Should Contain ${resp.text} distribution_success_count_total 1.0 + Should Contain ${resp.text} total_distribution_received_count_total 2.0 + Should Contain ${resp.text} distribution_success_count_total 2.0 Should Contain ${resp.text} distribution_failure_count_total 0.0 - Should Contain ${resp.text} total_download_received_count_total 1.0 - Should Contain ${resp.text} download_success_count_total 1.0 + Should Contain ${resp.text} total_download_received_count_total 2.0 + Should Contain ${resp.text} download_success_count_total 2.0 Should Contain ${resp.text} download_failure_count_total 0.0 *** Keywords *** diff --git a/csit/resources/tests/drools-applications-test.robot b/csit/resources/tests/drools-applications-test.robot index 81ec65e0..5c6a1b5c 100644 --- a/csit/resources/tests/drools-applications-test.robot +++ b/csit/resources/tests/drools-applications-test.robot @@ -29,8 +29,8 @@ Healthcheck MakeTopics [Documentation] Creates the Policy topics - GetTopic POLICY-PDP-PAP - GetTopic POLICY-CL-MGT + GetKafkaTopic policy-pdp-pap + GetKafkaTopic policy-cl-mgt CreateVcpeXacmlPolicy [Documentation] Create VCPE Policy for Xacml @@ -59,27 +59,20 @@ CreateVfwDroolsPolicy DeployXacmlPolicies [Documentation] Deploys the Policies to Xacml PerformPostRequest /policy/pap/v1/pdps/deployments/batch null ${POLICY_PAP_IP} deploy.xacml.policies.json ${CURDIR}/data json 202 - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} xacml - Should Contain ${result} restart - Should Contain ${result} onap.restart.tca + Sleep 5s + ${result}= CheckKafkaTopic policy-notification onap.vfirewall.tca + Should Contain ${result} deployed-policies Should Contain ${result} onap.scaleout.tca - Should Contain ${result} onap.vfirewall.tca + Should Contain ${result} onap.restart.tca DeployDroolsPolicies [Documentation] Deploys the Policies to Drools PerformPostRequest /policy/pap/v1/pdps/deployments/batch null ${POLICY_PAP_IP} deploy.drools.policies.json ${CURDIR}/data json 202 - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} drools - Should Contain ${result} operational.restart + Sleep 5s + ${result}= CheckKafkaTopic policy-notification operational.modifyconfig + Should Contain ${result} deployed-policies Should Contain ${result} operational.scaleout - Should Contain ${result} operational.modifyconfig + Should Contain ${result} operational.restart #VcpeExecute # [Documentation] Executes VCPE Policy @@ -179,7 +172,7 @@ OnSet ${data}= Get File ${file} Create Session session http://${DMAAP_IP} max_retries=1 ${headers}= Create Dictionary Content-Type=application/json - ${resp}= POST On Session session /events/unauthenticated.DCAE_CL_OUTPUT headers=${headers} data=${data} + ${resp}= POST On Session session /events/unauthenticateddcae_cl_output headers=${headers} data=${data} Log Response from dmaap ${resp.text} Status Should Be OK [Return] ${resp.text} diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py new file mode 100755 index 00000000..595e3db7 --- /dev/null +++ b/csit/resources/tests/kafka_consumer.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to fetch kafka topic and look for required messages. +# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic. + + +from confluent_kafka import Consumer, KafkaException +import sys +import time + + +def consume_kafka_topic(topic, expected_values, timeout): + config = { + 'bootstrap.servers': 'localhost:29092', + 'group.id': 'testgrp', + 'auto.offset.reset': 'earliest' + } + consumer = Consumer(config) + consumer.subscribe([topic]) + try: + start_time = time.time() + while time.time() - start_time < timeout: + msg = consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n") + print('ERROR') + sys.exit(404) + else: + # Error + raise KafkaException(msg.error()) + else: + # Message received + message = msg.value().decode('utf-8') + if expected_values in message: + print(message) + sys.exit(200) + finally: + consumer.close() + + +if __name__ == '__main__': + topic_name = sys.argv[1] + timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic + expected_values = sys.argv[3] + consume_kafka_topic(topic_name, expected_values, timeout) diff --git a/csit/resources/tests/kafka_producer.py b/csit/resources/tests/kafka_producer.py new file mode 100755 index 00000000..e6f01c21 --- /dev/null +++ b/csit/resources/tests/kafka_producer.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to produce a message on a kafka topic +# Accepts the arguments {topic_name} and {message} + +from confluent_kafka import Producer +import sys + +def post_to_kafka(topic, message): + conf = {'bootstrap.servers': 'localhost:29092'} + + producer = Producer(conf) + try: + producer.produce(topic, value=message.encode('utf-8')) + producer.flush() + print('Message posted to Kafka topic: {}'.format(topic)) + except Exception as e: + print('Failed to post message: {}'.format(str(e))) + finally: + producer.flush() + +if __name__ == '__main__': + post_to_kafka(sys.argv[1], sys.argv[2]) diff --git a/csit/resources/tests/make_topics.py b/csit/resources/tests/make_topics.py new file mode 100755 index 00000000..64a230eb --- /dev/null +++ b/csit/resources/tests/make_topics.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# +# ============LICENSE_START==================================================== +# Copyright (C) 2023-2024 Nordix Foundation. +# ============================================================================= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END====================================================== + +# Python utility to create a new kafka topic +# Accepts the argument {topic_name} + +from confluent_kafka.admin import AdminClient, NewTopic +import sys + +def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2): + admin_client = AdminClient({'bootstrap.servers': bootstrap_servers}) + + # Define the topic configuration + topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor) + + # Create the topic + admin_client.create_topics([topic]) + + +if __name__ == '__main__': + topic_name = sys.argv[1] + bootstrap_servers = 'localhost:29092' + + create_topic(bootstrap_servers, topic_name) diff --git a/csit/resources/tests/pap-test.robot b/csit/resources/tests/pap-test.robot index 82328437..74d299a5 100644 --- a/csit/resources/tests/pap-test.robot +++ b/csit/resources/tests/pap-test.robot @@ -47,6 +47,7 @@ Healthcheck Consolidated Healthcheck [Documentation] Verify policy consolidated health check + sleep 20 ${resp}= GetReq /policy/pap/v1/components/healthcheck Should Be Equal As Strings ${resp.json()['healthy']} True diff --git a/csit/resources/tests/policy-clamp-test.robot b/csit/resources/tests/policy-clamp-test.robot index d5fec492..10f93893 100644 --- a/csit/resources/tests/policy-clamp-test.robot +++ b/csit/resources/tests/policy-clamp-test.robot @@ -97,6 +97,7 @@ QueryPolicies QueryPolicyTypes [Documentation] Verify the new policy types created ${auth}= Create List policyadmin zb!XztG34 + sleep 10 Log Creating session http://${POLICY_API_IP}}:6969 ${session}= Create Session policy http://${POLICY_API_IP} auth=${auth} ${headers}= Create Dictionary Accept=application/json Content-Type=application/json diff --git a/csit/resources/tests/xacml-pdp-test.robot b/csit/resources/tests/xacml-pdp-test.robot index 331a7fa5..e500a04c 100644 --- a/csit/resources/tests/xacml-pdp-test.robot +++ b/csit/resources/tests/xacml-pdp-test.robot @@ -19,7 +19,7 @@ Metrics MakeTopics [Documentation] Creates the Policy topics - GetTopic POLICY-PDP-PAP + GetKafkaTopic policy-pdp-pap ExecuteXacmlPolicy CreateMonitorPolicy @@ -56,12 +56,9 @@ DeployPolicies ${postjson}= Get file ${CURDIR}/data/vCPE.policy.input.tosca.deploy.json ${policyadmin}= PolicyAdminAuth PerformPostRequest ${POLICY_PAP_IP} /policy/pap/v1/pdps/policies 202 ${postjson} null ${policyadmin} - ${result}= CheckTopic POLICY-PDP-PAP PDP_UPDATE - Sleep 5s - ${result}= CheckTopic POLICY-PDP-PAP ACTIVE - Should Contain ${result} responseTo - Should Contain ${result} xacml - Should Contain ${result} onap.restart.tca + sleep 20s + ${result}= CheckKafkaTopic policy-notification onap.restart.tca + Should Contain ${result} deployed-policies GetAbbreviatedDecisionResult [Documentation] Get Decision with abbreviated results from Policy Xacml PDP diff --git a/csit/run-k8s-csit.sh b/csit/run-k8s-csit.sh index 9fd6159c..5a3ac3bb 100755 --- a/csit/run-k8s-csit.sh +++ b/csit/run-k8s-csit.sh @@ -1,7 +1,7 @@ #!/bin/bash # # ============LICENSE_START==================================================== -# Copyright (C) 2022-2023 Nordix Foundation. +# Copyright (C) 2022-2024 Nordix Foundation. # ============================================================================= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -253,7 +253,7 @@ function set_project_config() { $POLICY_CLAMP_CONTAINER,$POLICY_PF_PPNT_CONTAINER,$POLICY_K8S_PPNT_CONTAINER, $POLICY_HTTP_PPNT_CONTAINER) export SET_VALUES="--set $POLICY_APEX_CONTAINER.enabled=true --set $POLICY_XACML_CONTAINER.enabled=true - --set $POLICY_DISTRIBUTION_CONTAINER.enabled=true --set $POLICY_POLICY_DROOLS_CONTAINER.enabled=true + --set $POLICY_DISTRIBUTION_CONTAINER.enabled=true --set $POLICY_DROOLS_CONTAINER.enabled=true --set $POLICY_CLAMP_CONTAINER.enabled=true --set $POLICY_PF_PPNT_CONTAINER.enabled=true --set $POLICY_K8S_PPNT_CONTAINER.enabled=true --set $POLICY_HTTP_PPNT_CONTAINER.enabled=true" ;; -- cgit 1.2.3-korg