diff options
Diffstat (limited to 'components/pm-subscription-handler/tests/test_subscription.py')
-rwxr-xr-x | components/pm-subscription-handler/tests/test_subscription.py | 86 |
1 files changed, 73 insertions, 13 deletions
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py index 97c1d6a1..8fe233e4 100755 --- a/components/pm-subscription-handler/tests/test_subscription.py +++ b/components/pm-subscription-handler/tests/test_subscription.py @@ -17,11 +17,12 @@ # ============LICENSE_END===================================================== import json import os -import unittest from test.support import EnvironmentVarGuard -from unittest import mock +from unittest import TestCase +from unittest.mock import patch from requests import Session +from tenacity import stop_after_attempt import mod.aai_client as aai_client from mod import db, create_app @@ -29,11 +30,12 @@ from mod.network_function import NetworkFunction from mod.subscription import Subscription, NetworkFunctionFilter -class SubscriptionTest(unittest.TestCase): - - @mock.patch('mod.get_db_connection_url') - @mock.patch.object(Session, 'put') - def setUp(self, mock_session, mock_get_db_url): +class SubscriptionTest(TestCase): + @patch('mod.pmsh_utils._MrPub') + @patch('mod.pmsh_utils._MrSub') + @patch('mod.get_db_connection_url') + @patch.object(Session, 'put') + def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub): mock_get_db_url.return_value = 'sqlite://' with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data: self.aai_response_data = data.read() @@ -54,6 +56,8 @@ class SubscriptionTest(unittest.TestCase): self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried') self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active') self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter) + self.mock_mr_sub = mock_mr_sub + self.mock_mr_pub = mock_mr_pub self.app = create_app() self.app_context = self.app.app_context() self.app_context.push() @@ -100,12 +104,6 @@ class SubscriptionTest(unittest.TestCase): self.assertEqual(sub1, same_sub1) self.assertEqual(1, len(self.sub_1.get_all())) - def test_get_nfs_per_subscription(self): - nf_array = [self.nf_1, self.nf_2] - self.sub_1.add_network_functions_to_subscription(nf_array) - nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations() - self.assertEqual(2, len(nfs_for_sub_1)) - def test_add_network_functions_per_subscription(self): nf_array = [self.nf_1, self.nf_2] self.sub_1.add_network_functions_to_subscription(nf_array) @@ -125,3 +123,65 @@ class SubscriptionTest(unittest.TestCase): self.sub_1.add_network_functions_to_subscription(nf_array) nf_subs = Subscription.get_all_nfs_subscription_relations() self.assertEqual(1, len(nf_subs)) + + def test_update_subscription_status(self): + sub_name = 'ExtraPM-All-gNB-R2B' + self.sub_1.create() + self.sub_1.administrativeState = 'new_status' + self.sub_1.update_subscription_status() + sub = Subscription.get(sub_name) + + self.assertEqual('new_status', sub.status) + + def test_delete_subscription(self): + self.sub_1.create() + subs = self.sub_1.get_all() + self.assertEqual(1, len(subs)) + + self.sub_1.delete_subscription() + new_subs = self.sub_1.get_all() + self.assertEqual(0, len(new_subs)) + + def test_update_sub_nf_status(self): + sub_name = 'ExtraPM-All-gNB-R2B' + nf_array = [self.nf_1, self.nf_2] + self.sub_1.add_network_functions_to_subscription(nf_array) + sub_nfs = Subscription.get_all_nfs_subscription_relations() + self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status) + + Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1') + sub_nfs = Subscription.get_all_nfs_subscription_relations() + self.assertEqual('Active', sub_nfs[0].nf_sub_status) + self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status) + + @patch('mod.subscription.Subscription.add_network_functions_to_subscription') + @patch('mod.subscription.Subscription.update_sub_nf_status') + @patch('mod.subscription.Subscription.update_subscription_status') + def test_process_activate_subscription(self, mock_update_sub_status, + mock_update_sub_nf, mock_add_nfs): + self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) + self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub) + + mock_update_sub_status.assert_called() + mock_add_nfs.assert_called() + self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called) + mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName, + 'PENDING_CREATE', self.nf_1.nf_name) + + @patch('mod.subscription.Subscription.update_sub_nf_status') + @patch('mod.subscription.Subscription.update_subscription_status') + def test_process_deactivate_subscription(self, mock_update_sub_status, + mock_update_sub_nf): + self.sub_1.administrativeState = 'LOCKED' + self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) + self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub) + + self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called) + mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName, + 'PENDING_DELETE', self.nf_1.nf_name) + mock_update_sub_status.assert_called() + + def test_process_subscription_exception(self): + self.sub_1.process_subscription.retry.stop = stop_after_attempt(1) + self.assertRaises(Exception, self.sub_1.process_subscription, + [self.nf_1], 'not_mr_pub') |