summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2020-08-05 10:12:04 +0100
committerefiacor <fiachra.corcoran@est.tech>2020-08-13 11:22:39 +0100
commit38ccb471732faaad6a25defee0753c1c5ac60cf0 (patch)
tree6b8fd360895ec5cc621d89ab35af7c14a4c05caf /components/pm-subscription-handler/tests
parent66f25c47daae80fe0ebc57bfec32596608d0be5d (diff)
Refactor and bug fixes
Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: I8fe91bfdd2f1a2c8a6ca914e52d82dce04bffc0e Issue-ID: DCAEGEN2-2146
Diffstat (limited to 'components/pm-subscription-handler/tests')
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_aai_event_handler.py33
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_controller.py9
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_exit_handler.py77
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_network_function.py11
-rwxr-xr-xcomponents/pm-subscription-handler/tests/test_subscription.py73
-rw-r--r--components/pm-subscription-handler/tests/test_subscription_handler.py95
6 files changed, 163 insertions, 135 deletions
diff --git a/components/pm-subscription-handler/tests/test_aai_event_handler.py b/components/pm-subscription-handler/tests/test_aai_event_handler.py
index d366dac5..9ac76477 100755
--- a/components/pm-subscription-handler/tests/test_aai_event_handler.py
+++ b/components/pm-subscription-handler/tests/test_aai_event_handler.py
@@ -16,42 +16,59 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
+import os
from os import path
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch, Mock
+from mod import create_app, db
from mod.aai_event_handler import process_aai_events
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
class AAIEventHandlerTest(TestCase):
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- def setUp(self, mock_get_pmsh_config):
+ def setUp(self, mock_get_pmsh_config, mock_update_config, mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(path.join(path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
self.app_conf = AppConfig()
with open(path.join(path.dirname(__file__), 'data/mr_aai_events.json'), 'r') as data:
self.mr_aai_events = json.load(data)["mr_response"]
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
self.mock_mr_sub = Mock(get_from_topic=Mock(return_value=self.mr_aai_events))
self.mock_mr_pub = Mock()
self.mock_app = Mock()
+ self.app = create_app()
+ self.app_context = self.app.app_context()
+ self.app_context.push()
+ db.create_all()
- @patch('mod.subscription.Subscription.process_subscription')
+ def tearDown(self):
+ db.session.remove()
+ db.drop_all()
+ self.app_context.pop()
+
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_event_handler.NetworkFunction.delete')
@patch('mod.aai_event_handler.NetworkFunction.get')
def test_process_aai_update_and_delete_events(self, mock_nf_get, mock_nf_delete,
- mock_process_sub):
+ mock_activate_sub):
pnf_already_active = NetworkFunction(nf_name='pnf_already_active',
- orchestration_status=OrchestrationStatus.ACTIVE.value)
+ orchestration_status='Active')
mock_nf_get.side_effect = [None, pnf_already_active]
expected_nf_for_processing = NetworkFunction(
- nf_name='pnf_newly_discovered', orchestration_status=OrchestrationStatus.ACTIVE.value)
+ nf_name='pnf_newly_discovered', orchestration_status='Active')
process_aai_events(self.mock_mr_sub, self.mock_mr_pub, self.mock_app, self.app_conf)
- mock_process_sub.assert_called_once_with([expected_nf_for_processing],
- self.mock_mr_pub, self.app_conf)
+ mock_activate_sub.assert_called_once_with([expected_nf_for_processing],
+ self.mock_mr_pub, self.app_conf)
mock_nf_delete.assert_called_once_with(nf_name='pnf_to_be_deleted')
diff --git a/components/pm-subscription-handler/tests/test_controller.py b/components/pm-subscription-handler/tests/test_controller.py
index d324a07d..4fcecc37 100755
--- a/components/pm-subscription-handler/tests/test_controller.py
+++ b/components/pm-subscription-handler/tests/test_controller.py
@@ -47,27 +47,26 @@ class ControllerTestCase(unittest.TestCase):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
def tearDown(self):
db.session.remove()
db.drop_all()
- self.app_context.pop()
def test_status_response_healthy(self):
self.assertEqual(status()['status'], 'healthy')
def test_get_all_sub_to_nf_relations(self):
- self.app_conf.subscription.create()
+ sub_model = self.app_conf.subscription.get()
for nf in [self.nf_1, self.nf_2]:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, sub_model)
all_subs = get_all_sub_to_nf_relations()
self.assertEqual(len(all_subs[0]['network_functions']), 2)
self.assertEqual(all_subs[0]['subscription_name'], 'ExtraPM-All-gNB-R2B')
diff --git a/components/pm-subscription-handler/tests/test_exit_handler.py b/components/pm-subscription-handler/tests/test_exit_handler.py
index ac1e15c6..d41bd03b 100755
--- a/components/pm-subscription-handler/tests/test_exit_handler.py
+++ b/components/pm-subscription-handler/tests/test_exit_handler.py
@@ -15,56 +15,43 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
import os
-import signal
-import threading
-import time
+from signal import SIGTERM, signal
+from test.support import EnvironmentVarGuard
from unittest import TestCase
-from unittest.mock import patch, Mock, MagicMock
+from unittest.mock import patch, Mock
-import pmsh_service_main
+from mod.api.db_models import NetworkFunctionModel
from mod.exit_handler import ExitHandler
-from mod.pmsh_utils import PeriodicTask
-from mod.subscription import AdministrativeState
+from mod.pmsh_utils import AppConfig
+from mod.subscription import Subscription
class ExitHandlerTests(TestCase):
-
- @patch('pmsh_service_main.create_app')
- @patch('pmsh_service_main.db')
- @patch('pmsh_service_main.AppConfig')
- @patch('pmsh_service_main.Subscription')
- @patch('pmsh_service_main.launch_api_server')
- @patch('pmsh_service_main.SubscriptionHandler')
- @patch.object(PeriodicTask, 'start')
- @patch.object(PeriodicTask, 'cancel')
- def test_terminate_signal_success(self, mock_task_cancel, mock_task_start, mock_sub_handler,
- mock_launch_api_server, mock_sub, mock_app_conf,
- mock_db, mock_app):
- pid = os.getpid()
- mock_db.get_app.return_value = Mock()
-
- mock_sub.administrativeState = AdministrativeState.UNLOCKED.value
- mock_sub.process_subscription = Mock()
- mock_sub_handler_instance = MagicMock(execute=Mock(), current_sub=mock_sub)
- mock_sub_handler.side_effect = [mock_sub_handler_instance]
-
- def mock_api_server_run(param):
- while mock_sub.administrativeState == AdministrativeState.UNLOCKED.value:
- time.sleep(1)
-
- mock_launch_api_server.side_effect = mock_api_server_run
-
- def trigger_signal():
- time.sleep(1)
- os.kill(pid, signal.SIGTERM)
-
- thread = threading.Thread(target=trigger_signal)
- thread.start()
-
- pmsh_service_main.main()
-
- self.assertEqual(4, mock_task_cancel.call_count)
+ @patch('mod.subscription.Subscription.create')
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
+ @patch('mod.pmsh_utils.PeriodicTask')
+ def setUp(self, mock_periodic_task, mock_get_pmsh_config, mock_sub_create):
+ self.env = EnvironmentVarGuard()
+ self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
+ self.mock_aai_event_thread = mock_periodic_task
+ self.app_conf = AppConfig()
+ self.sub = self.app_conf.subscription
+
+ @patch('mod.logger.debug')
+ @patch.object(Subscription, 'update_sub_nf_status')
+ @patch.object(Subscription, 'update_subscription_status')
+ @patch.object(Subscription, '_get_nf_models',
+ return_value=[NetworkFunctionModel('pnf1', 'ACTIVE')])
+ def test_terminate_signal_successful(self, mock_sub_get_nf_models, mock_upd_sub_status,
+ mock_upd_subnf_status, mock_logger):
+ handler = ExitHandler(periodic_tasks=[self.mock_aai_event_thread],
+ app_conf=self.app_conf,
+ subscription_handler=Mock())
+ signal(SIGTERM, handler)
+ os.kill(os.getpid(), SIGTERM)
self.assertTrue(ExitHandler.shutdown_signal_received)
- self.assertEqual(1, mock_sub.process_subscription.call_count)
- self.assertEqual(mock_sub.administrativeState, AdministrativeState.LOCKED.value)
diff --git a/components/pm-subscription-handler/tests/test_network_function.py b/components/pm-subscription-handler/tests/test_network_function.py
index 86baef83..cdfb1eb5 100755
--- a/components/pm-subscription-handler/tests/test_network_function.py
+++ b/components/pm-subscription-handler/tests/test_network_function.py
@@ -15,6 +15,7 @@
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
+import json
import os
from test.support import EnvironmentVarGuard
from unittest import TestCase
@@ -22,23 +23,29 @@ from unittest.mock import patch
from mod import db, create_app
from mod.network_function import NetworkFunction
+from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
class NetworkFunctionTests(TestCase):
+ @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
@patch('mod.update_logging_config')
@patch('mod.get_db_connection_url')
- def setUp(self, mock_get_db_url, mock_update_config):
+ def setUp(self, mock_get_db_url, mock_update_config, mock_get_pmsh_config):
mock_get_db_url.return_value = 'sqlite://'
self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
+ with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
+ self.cbs_data = json.load(data)
+ mock_get_pmsh_config.return_value = self.cbs_data
self.env = EnvironmentVarGuard()
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
def tearDown(self):
db.session.remove()
@@ -74,7 +81,7 @@ class NetworkFunctionTests(TestCase):
self.nf_2.create()
sub = Subscription(**{"subscriptionName": "sub"})
for nf in [self.nf_1, self.nf_2]:
- sub.add_network_function_to_subscription(nf)
+ sub.add_network_function_to_subscription(nf, self.app_conf.subscription.get())
NetworkFunction.delete(nf_name=self.nf_1.nf_name)
diff --git a/components/pm-subscription-handler/tests/test_subscription.py b/components/pm-subscription-handler/tests/test_subscription.py
index 74593a42..27a189c2 100755
--- a/components/pm-subscription-handler/tests/test_subscription.py
+++ b/components/pm-subscription-handler/tests/test_subscription.py
@@ -26,7 +26,7 @@ from requests import Session
import mod.aai_client as aai_client
from mod import db, create_app
from mod.api.db_models import NetworkFunctionModel
-from mod.network_function import NetworkFunction, OrchestrationStatus
+from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
@@ -51,27 +51,28 @@ class SubscriptionTest(TestCase):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.mock_mr_sub = mock_mr_sub
self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
+ self.app_conf = AppConfig()
+ self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
+ self.sub_model = self.app_conf.subscription.get()
def tearDown(self):
- db.session.remove()
db.drop_all()
+ db.session.remove()
self.app_context.pop()
def test_xnf_filter_true(self):
self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
- OrchestrationStatus.ACTIVE.value))
+ 'Active'))
def test_xnf_filter_false(self):
self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
- OrchestrationStatus.ACTIVE.value))
+ 'Active'))
def test_sub_measurement_group(self):
self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
@@ -82,18 +83,15 @@ class SubscriptionTest(TestCase):
def test_get_subscription(self):
sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
- new_sub = Subscription.get(sub_name)
+ new_sub = self.app_conf.subscription.get()
self.assertEqual(sub_name, new_sub.subscription_name)
- def test_get_subscription_no_match(self):
- sub_name = 'sub2_does_not_exist'
- sub = Subscription.get(sub_name)
- self.assertEqual(sub, None)
-
def test_get_nf_names_per_sub(self):
self.app_conf.subscription.create()
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
+ self.sub_model)
nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
self.assertEqual(2, len(nfs))
@@ -105,37 +103,38 @@ class SubscriptionTest(TestCase):
def test_add_network_functions_per_subscription(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(3, len(nfs_for_sub_1))
new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
- self.app_conf.subscription.add_network_function_to_subscription(new_nf)
+ self.app_conf.subscription.add_network_function_to_subscription(new_nf, self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(4, len(nf_subs))
def test_add_duplicate_network_functions_per_subscription(self):
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
- self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
+ self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
+ self.sub_model)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
def test_update_subscription_status(self):
- sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
self.app_conf.subscription.administrativeState = 'new_status'
self.app_conf.subscription.update_subscription_status()
- sub = Subscription.get(sub_name)
+ sub = self.app_conf.subscription.get()
self.assertEqual('new_status', sub.status)
def test_delete_subscription(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
self.app_conf.subscription.delete_subscription()
self.assertEqual(0, len(Subscription.get_all()))
- self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
+ self.assertEqual(None, self.app_conf.subscription.get())
self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
self.assertEqual(0, len(NetworkFunction.get_all()))
self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
@@ -143,7 +142,7 @@ class SubscriptionTest(TestCase):
def test_update_sub_nf_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
sub_nfs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
@@ -154,33 +153,27 @@ class SubscriptionTest(TestCase):
@patch('mod.subscription.Subscription.add_network_function_to_subscription')
@patch('mod.subscription.Subscription.update_sub_nf_status')
- @patch('mod.subscription.Subscription.update_subscription_status')
- def test_process_activate_subscription(self, mock_update_sub_status,
- mock_update_sub_nf, mock_add_nfs):
- self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
- self.app_conf)
+ def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
+ self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
+ self.app_conf)
- mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_CREATE', list(self.xnfs)[0].nf_name)
+ @patch('mod.subscription.Subscription.get_network_functions')
@patch('mod.subscription.Subscription.update_sub_nf_status')
- @patch('mod.subscription.Subscription.update_subscription_status')
- def test_process_deactivate_subscription(self, mock_update_sub_status,
- mock_update_sub_nf):
+ def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
self.app_conf.subscription.administrativeState = 'LOCKED'
- self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
- self.app_conf)
-
+ mock_get_nfs.return_value = [list(self.xnfs)[0]]
+ self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_DELETE', list(self.xnfs)[0].nf_name)
- mock_update_sub_status.assert_called()
- def test_process_subscription_exception(self):
- self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
+ def test_activate_subscription_exception(self):
+ self.assertRaises(Exception, self.app_conf.subscription.activate_subscription,
[list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
def test_prepare_subscription_event(self):
@@ -193,7 +186,7 @@ class SubscriptionTest(TestCase):
def test_get_nf_models(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nf_models = self.app_conf.subscription._get_nf_models()
self.assertEqual(3, len(nf_models))
@@ -201,7 +194,7 @@ class SubscriptionTest(TestCase):
def test_get_network_functions(self):
for nf in self.xnfs:
- self.app_conf.subscription.add_network_function_to_subscription(nf)
+ self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
nfs = self.app_conf.subscription.get_network_functions()
self.assertEqual(3, len(nfs))
diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py
index 65a7c2c0..a6611666 100644
--- a/components/pm-subscription-handler/tests/test_subscription_handler.py
+++ b/components/pm-subscription-handler/tests/test_subscription_handler.py
@@ -17,9 +17,11 @@
# ============LICENSE_END=====================================================
import json
import os
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch
+from mod import create_app, db
from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
from mod.subscription import AdministrativeState
@@ -28,72 +30,95 @@ from mod.subscription_handler import SubscriptionHandler
class SubscriptionHandlerTest(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.env = EnvironmentVarGuard()
+ cls.env.set('AAI_SERVICE_PORT', '8443')
+ cls.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+ cls.nfs = [NetworkFunction(nf_name='pnf_1'), NetworkFunction(nf_name='pnf_2')]
+
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- @patch('mod.create_app')
@patch('mod.pmsh_utils._MrPub')
@patch('mod.pmsh_utils.PeriodicTask')
- def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config):
+ def setUp(self, mock_periodic_task, mock_mr_pub, mock_get_pmsh_config, mock_update_config,
+ mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.mock_app = mock_app
self.mock_mr_pub = mock_mr_pub
- self.mock_aai_event_thread = mock_aai_event_thread
- self.nf_1 = NetworkFunction(nf_name='pnf_1')
- self.nf_2 = NetworkFunction(nf_name='pnf_2')
- self.nfs = [self.nf_1, self.nf_2]
+ self.mock_aai_event_thread = mock_periodic_task
+ self.mock_policy_event_thread = mock_periodic_task
+ self.app = create_app()
+ self.app.app_context().push()
+ db.create_all()
+ self.app_conf = AppConfig()
def tearDown(self):
- pass
+ db.drop_all()
+ db.session.remove()
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
@patch('mod.logger.info')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_no_change_of_state(self, mock_get_aai, mock_logger):
+ def test_execute_no_change_of_state(self, mock_get_aai, mock_logger, mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
mock_get_aai.return_value = self.nfs
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Administrative State did not change in the Config')
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub,
+ mock_get_sub_status):
mock_get_aai.return_value = self.nfs
+ mock_get_sub_status.return_value = AdministrativeState.LOCKED.value
self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ self.assertEqual(AdministrativeState.UNLOCKED.value,
+ self.app_conf.subscription.administrativeState)
+ mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.start.assert_called()
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.deactivate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub,
+ mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+ self.app_conf.subscription.update_subscription_status()
mock_get_aai.return_value = self.nfs
self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
- self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.cancel.assert_called()
- self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.logger.error')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub):
+ def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
mock_get_aai.return_value = self.nfs
- mock_process_sub.side_effect = Exception
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ mock_activate_sub.side_effect = Exception
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
exc_info=True)