diff options
Diffstat (limited to 'components/pm-subscription-handler/tests/services/test_measurement_group_service.py')
-rw-r--r-- | components/pm-subscription-handler/tests/services/test_measurement_group_service.py | 177 |
1 files changed, 172 insertions, 5 deletions
diff --git a/components/pm-subscription-handler/tests/services/test_measurement_group_service.py b/components/pm-subscription-handler/tests/services/test_measurement_group_service.py index 97353afe..f656513e 100644 --- a/components/pm-subscription-handler/tests/services/test_measurement_group_service.py +++ b/components/pm-subscription-handler/tests/services/test_measurement_group_service.py @@ -1,5 +1,5 @@ # ============LICENSE_START=================================================== -# Copyright (C) 2021 Nordix Foundation. +# Copyright (C) 2021-2022 Nordix Foundation. # ============================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,10 +19,13 @@ import json import os from unittest.mock import patch -from mod.network_function import NetworkFunction + +from mod.api.custom_exception import InvalidDataException, DataConflictException +from mod.network_function import NetworkFunction, NetworkFunctionFilter from mod.pmsh_config import AppConfig -from mod import db -from tests.base_setup import BaseClassSetup +from mod import db, aai_client +from tests.base_setup import BaseClassSetup, create_subscription_data, \ + create_multiple_network_function_data from mod.api.services import measurement_group_service, nf_service from mod.api.db_models import MeasurementGroupModel, NfMeasureGroupRelationalModel, \ SubscriptionModel, NetworkFunctionModel @@ -69,7 +72,7 @@ class MeasurementGroupServiceTestCase(BaseClassSetup): sub_model = SubscriptionModel('sub_publish', 'pmsh-operational-policy', 'pmsh-control-loop', 'LOCKED') measurement_group_service.publish_measurement_group( - sub_model, measurement_grp, nf_1) + sub_model, measurement_grp, nf_1, 'CREATE') mock_mr.assert_called_once_with('policy_pm_publisher', {'nfName': 'pnf_1', 'ipAddress': '2001:db8:3333:4444:5555:6666:7777:8888', @@ -191,3 +194,167 @@ class MeasurementGroupServiceTestCase(BaseClassSetup): subscription = self.subscription_request.replace('ExtraPM-All-gNB-R2B', new_sub_name) subscription = subscription.replace('msrmt_grp_name', new_msrmt_grp_name) return subscription + + @patch.object(AppConfig, 'publish_to_topic') + def test_update_admin_status_to_locking(self, mock_mr): + super().setUpAppConf() + sub = create_subscription_data('sub') + nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102']) + db.session.add(sub) + for nf in nf_list: + nf_service.save_nf(nf) + measurement_group_service. \ + apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[0]. + measurement_group_name, + SubNfState.CREATED.value) + db.session.commit() + measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED') + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG1') + self.assertEqual(meas_grp.administrative_state, 'LOCKING') + meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter( + NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\ + .all() + for nf in meas_group_nfs: + self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_DELETE.value) + + @patch.object(AppConfig, 'publish_to_topic') + def test_update_admin_status_to_locked(self, mock_mr): + super().setUpAppConf() + sub = create_subscription_data('sub') + db.session.add(sub) + measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED') + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG1') + self.assertEqual(meas_grp.administrative_state, 'LOCKED') + + @patch.object(AppConfig, 'publish_to_topic') + @patch.object(aai_client, '_get_all_aai_nf_data') + @patch.object(aai_client, 'get_aai_model_data') + @patch.object(NetworkFunctionFilter, 'get_network_function_filter') + def test_update_admin_status_to_unlocked_with_no_nfs(self, mock_filter_call, + mock_model_aai, mock_aai, mock_mr): + mock_aai.return_value = json.loads(self.aai_response_data) + mock_model_aai.return_value = json.loads(self.good_model_info) + super().setUpAppConf() + sub = create_subscription_data('sub') + sub.nfs = [] + db.session.add(sub) + db.session.commit() + mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub') + measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED') + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG2') + self.assertEqual(meas_grp.administrative_state, 'UNLOCKED') + meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter( + NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\ + .all() + for nf in meas_group_nfs: + self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value) + + @patch.object(AppConfig, 'publish_to_topic') + @patch.object(aai_client, '_get_all_aai_nf_data') + @patch.object(aai_client, 'get_aai_model_data') + @patch.object(NetworkFunctionFilter, 'get_network_function_filter') + def test_update_admin_status_to_unlocking(self, mock_filter_call, + mock_model_aai, mock_aai, mock_mr): + mock_aai.return_value = json.loads(self.aai_response_data) + mock_model_aai.return_value = json.loads(self.good_model_info) + super().setUpAppConf() + sub = create_subscription_data('sub') + db.session.add(sub) + db.session.commit() + mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub') + measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED') + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG2') + self.assertEqual(meas_grp.administrative_state, 'UNLOCKED') + meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter( + NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\ + .all() + for nf in meas_group_nfs: + self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value) + + def test_update_admin_status_for_missing_measurement_group(self): + try: + measurement_group_service.update_admin_status(None, 'UNLOCKED') + except InvalidDataException as e: + self.assertEqual(e.args[0], 'Requested measurement group not available ' + 'for admin status update') + + def test_update_admin_status_for_data_conflict(self): + super().setUpAppConf() + sub = create_subscription_data('sub1') + sub.measurement_groups[0].administrative_state = 'LOCKING' + try: + measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED') + except DataConflictException as e: + self.assertEqual(e.args[0], 'Cannot update admin status as Locked request' + ' is in progress for sub name: sub1 and ' + 'meas group name: MG1') + + def test_update_admin_status_for_same_state(self): + super().setUpAppConf() + sub = create_subscription_data('sub1') + try: + measurement_group_service.update_admin_status(sub.measurement_groups[0], 'UNLOCKED') + except InvalidDataException as e: + self.assertEqual(e.args[0], 'Measurement group is already in UNLOCKED ' + 'state for sub name: sub1 and meas group ' + 'name: MG1') + + def test_lock_nf_to_meas_grp_for_locking_with_LOCKED_update(self): + sub = create_subscription_data('sub') + sub.measurement_groups[1].administrative_state = 'LOCKING' + nf_list = create_multiple_network_function_data(['pnf_101']) + db.session.add(sub) + for nf in nf_list: + nf_service.save_nf(nf) + measurement_group_service. \ + apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1]. + measurement_group_name, + SubNfState.PENDING_DELETE.value) + db.session.commit() + measurement_group_service.lock_nf_to_meas_grp( + "pnf_101", "MG2", SubNfState.DELETED.value) + measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter( + NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2', + NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none()) + self.assertIsNone(measurement_grp_rel) + network_function = (NetworkFunctionModel.query.filter( + NetworkFunctionModel.nf_name == 'pnf_101').one_or_none()) + self.assertIsNone(network_function) + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG2') + self.assertEqual(meas_grp.administrative_state, 'LOCKED') + + def test_lock_nf_to_meas_grp_with_no_LOCKED_update(self): + sub = create_subscription_data('sub') + sub.measurement_groups[1].administrative_state = 'LOCKING' + nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102']) + db.session.add(sub) + for nf in nf_list: + nf_service.save_nf(nf) + measurement_group_service. \ + apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1]. + measurement_group_name, + SubNfState.PENDING_DELETE.value) + db.session.commit() + measurement_group_service.lock_nf_to_meas_grp( + "pnf_101", "MG2", SubNfState.DELETED.value) + measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter( + NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2', + NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none()) + self.assertIsNone(measurement_grp_rel) + network_function = (NetworkFunctionModel.query.filter( + NetworkFunctionModel.nf_name == 'pnf_101').one_or_none()) + self.assertIsNone(network_function) + meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2') + self.assertEqual(meas_grp.subscription_name, 'sub') + self.assertEqual(meas_grp.measurement_group_name, 'MG2') + self.assertEqual(meas_grp.administrative_state, 'LOCKING') |