summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests/test_subscription.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/tests/test_subscription.py')
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_subscription.py86
1 files changed, 73 insertions, 13 deletions
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index 97c1d6a1..8fe233e4 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -17,11 +17,12 @@
# ============LICENSE_END=====================================================
import json
import os
-import unittest
from test.support import EnvironmentVarGuard
-from unittest import mock
+from unittest import TestCase
+from unittest.mock import patch
from requests import Session
+from tenacity import stop_after_attempt
import mod.aai_client as aai_client
from mod import db, create_app
@@ -29,11 +30,12 @@ from mod.network_function import NetworkFunction
from mod.subscription import Subscription, NetworkFunctionFilter
-class SubscriptionTest(unittest.TestCase):
-
- @mock.patch('mod.get_db_connection_url')
- @mock.patch.object(Session, 'put')
- def setUp(self, mock_session, mock_get_db_url):
+class SubscriptionTest(TestCase):
+ @patch('mod.pmsh_utils._MrPub')
+ @patch('mod.pmsh_utils._MrSub')
+ @patch('mod.get_db_connection_url')
+ @patch.object(Session, 'put')
+ def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
@@ -54,6 +56,8 @@ class SubscriptionTest(unittest.TestCase):
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
+ self.mock_mr_sub = mock_mr_sub
+ self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
@@ -100,12 +104,6 @@ class SubscriptionTest(unittest.TestCase):
self.assertEqual(sub1, same_sub1)
self.assertEqual(1, len(self.sub_1.get_all()))
- def test_get_nfs_per_subscription(self):
- nf_array = [self.nf_1, self.nf_2]
- self.sub_1.add_network_functions_to_subscription(nf_array)
- nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
- self.assertEqual(2, len(nfs_for_sub_1))
-
def test_add_network_functions_per_subscription(self):
nf_array = [self.nf_1, self.nf_2]
self.sub_1.add_network_functions_to_subscription(nf_array)
@@ -125,3 +123,65 @@ class SubscriptionTest(unittest.TestCase):
self.sub_1.add_network_functions_to_subscription(nf_array)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
+
+ def test_update_subscription_status(self):
+ sub_name = 'ExtraPM-All-gNB-R2B'
+ self.sub_1.create()
+ self.sub_1.administrativeState = 'new_status'
+ self.sub_1.update_subscription_status()
+ sub = Subscription.get(sub_name)
+
+ self.assertEqual('new_status', sub.status)
+
+ def test_delete_subscription(self):
+ self.sub_1.create()
+ subs = self.sub_1.get_all()
+ self.assertEqual(1, len(subs))
+
+ self.sub_1.delete_subscription()
+ new_subs = self.sub_1.get_all()
+ self.assertEqual(0, len(new_subs))
+
+ def test_update_sub_nf_status(self):
+ sub_name = 'ExtraPM-All-gNB-R2B'
+ nf_array = [self.nf_1, self.nf_2]
+ self.sub_1.add_network_functions_to_subscription(nf_array)
+ sub_nfs = Subscription.get_all_nfs_subscription_relations()
+ self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
+
+ Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
+ sub_nfs = Subscription.get_all_nfs_subscription_relations()
+ self.assertEqual('Active', sub_nfs[0].nf_sub_status)
+ self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
+
+ @patch('mod.subscription.Subscription.add_network_functions_to_subscription')
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ @patch('mod.subscription.Subscription.update_subscription_status')
+ def test_process_activate_subscription(self, mock_update_sub_status,
+ mock_update_sub_nf, mock_add_nfs):
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+
+ mock_update_sub_status.assert_called()
+ mock_add_nfs.assert_called()
+ self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
+ mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
+ 'PENDING_CREATE', self.nf_1.nf_name)
+
+ @patch('mod.subscription.Subscription.update_sub_nf_status')
+ @patch('mod.subscription.Subscription.update_subscription_status')
+ def test_process_deactivate_subscription(self, mock_update_sub_status,
+ mock_update_sub_nf):
+ self.sub_1.administrativeState = 'LOCKED'
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
+
+ self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
+ mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
+ 'PENDING_DELETE', self.nf_1.nf_name)
+ mock_update_sub_status.assert_called()
+
+ def test_process_subscription_exception(self):
+ self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
+ self.assertRaises(Exception, self.sub_1.process_subscription,
+ [self.nf_1], 'not_mr_pub')