summaryrefslogtreecommitdiffstats
path: root/components/pm-subscription-handler/tests/test_subscription_handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'components/pm-subscription-handler/tests/test_subscription_handler.py')
-rw-r--r--components/pm-subscription-handler/tests/test_subscription_handler.py95
1 files changed, 60 insertions, 35 deletions
diff --git a/components/pm-subscription-handler/tests/test_subscription_handler.py b/components/pm-subscription-handler/tests/test_subscription_handler.py
index 65a7c2c0..a6611666 100644
--- a/components/pm-subscription-handler/tests/test_subscription_handler.py
+++ b/components/pm-subscription-handler/tests/test_subscription_handler.py
@@ -17,9 +17,11 @@
# ============LICENSE_END=====================================================
import json
import os
+from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch
+from mod import create_app, db
from mod.network_function import NetworkFunction
from mod.pmsh_utils import AppConfig
from mod.subscription import AdministrativeState
@@ -28,72 +30,95 @@ from mod.subscription_handler import SubscriptionHandler
class SubscriptionHandlerTest(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.env = EnvironmentVarGuard()
+ cls.env.set('AAI_SERVICE_PORT', '8443')
+ cls.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
+ cls.nfs = [NetworkFunction(nf_name='pnf_1'), NetworkFunction(nf_name='pnf_2')]
+
+ @patch('mod.get_db_connection_url')
+ @patch('mod.update_logging_config')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
- @patch('mod.create_app')
@patch('mod.pmsh_utils._MrPub')
@patch('mod.pmsh_utils.PeriodicTask')
- def setUp(self, mock_aai_event_thread, mock_mr_pub, mock_app, mock_get_pmsh_config):
+ def setUp(self, mock_periodic_task, mock_mr_pub, mock_get_pmsh_config, mock_update_config,
+ mock_get_db_url):
+ mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
- self.app_conf = AppConfig()
- self.mock_app = mock_app
self.mock_mr_pub = mock_mr_pub
- self.mock_aai_event_thread = mock_aai_event_thread
- self.nf_1 = NetworkFunction(nf_name='pnf_1')
- self.nf_2 = NetworkFunction(nf_name='pnf_2')
- self.nfs = [self.nf_1, self.nf_2]
+ self.mock_aai_event_thread = mock_periodic_task
+ self.mock_policy_event_thread = mock_periodic_task
+ self.app = create_app()
+ self.app.app_context().push()
+ db.create_all()
+ self.app_conf = AppConfig()
def tearDown(self):
- pass
+ db.drop_all()
+ db.session.remove()
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
@patch('mod.logger.info')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_no_change_of_state(self, mock_get_aai, mock_logger):
+ def test_execute_no_change_of_state(self, mock_get_aai, mock_logger, mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
mock_get_aai.return_value = self.nfs
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Administrative State did not change in the Config')
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_unlocked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub,
+ mock_get_sub_status):
mock_get_aai.return_value = self.nfs
+ mock_get_sub_status.return_value = AdministrativeState.LOCKED.value
self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.UNLOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ self.assertEqual(AdministrativeState.UNLOCKED.value,
+ self.app_conf.subscription.administrativeState)
+ mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.start.assert_called()
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+ @patch('mod.subscription.Subscription.deactivate_subscription')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_change_of_state_locked(self, mock_get_aai, mock_process_sub):
+ def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub,
+ mock_get_sub_status):
+ mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
+ self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+ self.app_conf.subscription.update_subscription_status()
mock_get_aai.return_value = self.nfs
self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
- self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
- sub_handler = SubscriptionHandler(AdministrativeState.UNLOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread.return_value)
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread.return_value,
+ self.mock_policy_event_thread)
sub_handler.execute()
- self.assertEqual(AdministrativeState.LOCKED.value, sub_handler.administrative_state)
- mock_process_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
+ mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf)
self.mock_aai_event_thread.return_value.cancel.assert_called()
- self.app_conf.subscription.administrativeState = AdministrativeState.UNLOCKED.value
- @patch('mod.subscription.Subscription.process_subscription')
+ @patch('mod.subscription.Subscription.activate_subscription')
@patch('mod.logger.error')
@patch('mod.aai_client.get_pmsh_nfs_from_aai')
- def test_execute_exception(self, mock_get_aai, mock_logger, mock_process_sub):
+ def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
mock_get_aai.return_value = self.nfs
- mock_process_sub.side_effect = Exception
- sub_handler = SubscriptionHandler(AdministrativeState.LOCKED.value, self.mock_mr_pub,
- self.mock_app, self.app_conf,
- self.mock_aai_event_thread)
+ mock_activate_sub.side_effect = Exception
+ sub_handler = SubscriptionHandler(self.mock_mr_pub,
+ self.app, self.app_conf,
+ self.mock_aai_event_thread,
+ self.mock_policy_event_thread)
sub_handler.execute()
mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
exc_info=True)