summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests/services/test_measurement_group_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/tests/services/test_measurement_group_service.py')
-rw-r--r--components/pm-subscription-handler/tests/services/test_measurement_group_service.py177
1 files changed, 172 insertions, 5 deletions
diff --git a/components/pm-subscription-handler/tests/services/test_measurement_group_service.py b/components/pm-subscription-handler/tests/services/test_measurement_group_service.py
index 97353afe..f656513e 100644
--- a/components/pm-subscription-handler/tests/services/test_measurement_group_service.py
+++ b/components/pm-subscription-handler/tests/services/test_measurement_group_service.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2021 Nordix Foundation.
+# Copyright (C) 2021-2022 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,10 +19,13 @@
import json
import os
from unittest.mock import patch
-from mod.network_function import NetworkFunction
+
+from mod.api.custom_exception import InvalidDataException, DataConflictException
+from mod.network_function import NetworkFunction, NetworkFunctionFilter
from mod.pmsh_config import AppConfig
-from mod import db
-from tests.base_setup import BaseClassSetup
+from mod import db, aai_client
+from tests.base_setup import BaseClassSetup, create_subscription_data, \
+ create_multiple_network_function_data
from mod.api.services import measurement_group_service, nf_service
from mod.api.db_models import MeasurementGroupModel, NfMeasureGroupRelationalModel, \
SubscriptionModel, NetworkFunctionModel
@@ -69,7 +72,7 @@ class MeasurementGroupServiceTestCase(BaseClassSetup):
sub_model = SubscriptionModel('sub_publish', 'pmsh-operational-policy',
'pmsh-control-loop', 'LOCKED')
measurement_group_service.publish_measurement_group(
- sub_model, measurement_grp, nf_1)
+ sub_model, measurement_grp, nf_1, 'CREATE')
mock_mr.assert_called_once_with('policy_pm_publisher',
{'nfName': 'pnf_1',
'ipAddress': '2001:db8:3333:4444:5555:6666:7777:8888',
@@ -191,3 +194,167 @@ class MeasurementGroupServiceTestCase(BaseClassSetup):
subscription = self.subscription_request.replace('ExtraPM-All-gNB-R2B', new_sub_name)
subscription = subscription.replace('msrmt_grp_name', new_msrmt_grp_name)
return subscription
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ def test_update_admin_status_to_locking(self, mock_mr):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[0].
+ measurement_group_name,
+ SubNfState.CREATED.value)
+ db.session.commit()
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG1')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKING')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_DELETE.value)
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ def test_update_admin_status_to_locked(self, mock_mr):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ db.session.add(sub)
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG1')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG1')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKED')
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ @patch.object(aai_client, '_get_all_aai_nf_data')
+ @patch.object(aai_client, 'get_aai_model_data')
+ @patch.object(NetworkFunctionFilter, 'get_network_function_filter')
+ def test_update_admin_status_to_unlocked_with_no_nfs(self, mock_filter_call,
+ mock_model_aai, mock_aai, mock_mr):
+ mock_aai.return_value = json.loads(self.aai_response_data)
+ mock_model_aai.return_value = json.loads(self.good_model_info)
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ sub.nfs = []
+ db.session.add(sub)
+ db.session.commit()
+ mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub')
+ measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'UNLOCKED')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value)
+
+ @patch.object(AppConfig, 'publish_to_topic')
+ @patch.object(aai_client, '_get_all_aai_nf_data')
+ @patch.object(aai_client, 'get_aai_model_data')
+ @patch.object(NetworkFunctionFilter, 'get_network_function_filter')
+ def test_update_admin_status_to_unlocking(self, mock_filter_call,
+ mock_model_aai, mock_aai, mock_mr):
+ mock_aai.return_value = json.loads(self.aai_response_data)
+ mock_model_aai.return_value = json.loads(self.good_model_info)
+ super().setUpAppConf()
+ sub = create_subscription_data('sub')
+ db.session.add(sub)
+ db.session.commit()
+ mock_filter_call.return_value = NetworkFunctionFilter.get_network_function_filter('sub')
+ measurement_group_service.update_admin_status(sub.measurement_groups[1], 'UNLOCKED')
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'UNLOCKED')
+ meas_group_nfs = db.session.query(NfMeasureGroupRelationalModel).filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == meas_grp.measurement_group_name)\
+ .all()
+ for nf in meas_group_nfs:
+ self.assertEqual(nf.nf_measure_grp_status, SubNfState.PENDING_CREATE.value)
+
+ def test_update_admin_status_for_missing_measurement_group(self):
+ try:
+ measurement_group_service.update_admin_status(None, 'UNLOCKED')
+ except InvalidDataException as e:
+ self.assertEqual(e.args[0], 'Requested measurement group not available '
+ 'for admin status update')
+
+ def test_update_admin_status_for_data_conflict(self):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub1')
+ sub.measurement_groups[0].administrative_state = 'LOCKING'
+ try:
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'LOCKED')
+ except DataConflictException as e:
+ self.assertEqual(e.args[0], 'Cannot update admin status as Locked request'
+ ' is in progress for sub name: sub1 and '
+ 'meas group name: MG1')
+
+ def test_update_admin_status_for_same_state(self):
+ super().setUpAppConf()
+ sub = create_subscription_data('sub1')
+ try:
+ measurement_group_service.update_admin_status(sub.measurement_groups[0], 'UNLOCKED')
+ except InvalidDataException as e:
+ self.assertEqual(e.args[0], 'Measurement group is already in UNLOCKED '
+ 'state for sub name: sub1 and meas group '
+ 'name: MG1')
+
+ def test_lock_nf_to_meas_grp_for_locking_with_LOCKED_update(self):
+ sub = create_subscription_data('sub')
+ sub.measurement_groups[1].administrative_state = 'LOCKING'
+ nf_list = create_multiple_network_function_data(['pnf_101'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1].
+ measurement_group_name,
+ SubNfState.PENDING_DELETE.value)
+ db.session.commit()
+ measurement_group_service.lock_nf_to_meas_grp(
+ "pnf_101", "MG2", SubNfState.DELETED.value)
+ measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2',
+ NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(measurement_grp_rel)
+ network_function = (NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(network_function)
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKED')
+
+ def test_lock_nf_to_meas_grp_with_no_LOCKED_update(self):
+ sub = create_subscription_data('sub')
+ sub.measurement_groups[1].administrative_state = 'LOCKING'
+ nf_list = create_multiple_network_function_data(['pnf_101', 'pnf_102'])
+ db.session.add(sub)
+ for nf in nf_list:
+ nf_service.save_nf(nf)
+ measurement_group_service. \
+ apply_nf_status_to_measurement_group(nf.nf_name, sub.measurement_groups[1].
+ measurement_group_name,
+ SubNfState.PENDING_DELETE.value)
+ db.session.commit()
+ measurement_group_service.lock_nf_to_meas_grp(
+ "pnf_101", "MG2", SubNfState.DELETED.value)
+ measurement_grp_rel = (NfMeasureGroupRelationalModel.query.filter(
+ NfMeasureGroupRelationalModel.measurement_grp_name == 'MG2',
+ NfMeasureGroupRelationalModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(measurement_grp_rel)
+ network_function = (NetworkFunctionModel.query.filter(
+ NetworkFunctionModel.nf_name == 'pnf_101').one_or_none())
+ self.assertIsNone(network_function)
+ meas_grp = measurement_group_service.query_meas_group_by_name('sub', 'MG2')
+ self.assertEqual(meas_grp.subscription_name, 'sub')
+ self.assertEqual(meas_grp.measurement_group_name, 'MG2')
+ self.assertEqual(meas_grp.administrative_state, 'LOCKING')